-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NGINX Support for Auth Listener #1
Comments
Have gone with email auth for now but we can add an http server later. I can't find any VSA webhooks anyway so there is only the auth to listen for. |
Hello again. I don't know if it will give you any help on what and how you were thinking to implement it, but I did some google search, copy and past and tweaks and here you are a "sort of working" new VSA_Auth.py working from ../PythonVSA. U hope it will help... cheers import configparser
from types import resolve_bases
import requests
import os,sys
import logging
import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from http.server import HTTPServer, BaseHTTPRequestHandler
from http import HTTPStatus
from time import sleep
import re
from imaplib import IMAP4_SSL
import email
import socket
from PythonVSA import Auth
from http.server import HTTPServer, BaseHTTPRequestHandler, SimpleHTTPRequestHandler
import ssl
import requests
import pathlib
import json
from urllib.parse import urlparse, parse_qs
from datetime import datetime, timedelta
import ipaddress
# def start_server():
# # Setup stuff here...
# server.serve_forever()
# # start the server in a background thread
# thread.start_new_thread(start_server)
# print('The server is running but my script is still executing!')
# https://gist.github.com/bloodearnest/9017111a313777b9cce5
def _generateSelfSignedCert(hostname, ip_addresses=None, key=None):
"""Generates self signed certificate for a hostname, and optional IP addresses."""
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
# Generate our key
if key is None:
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend(),
)
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, hostname)
])
# best practice seem to be to include the hostname in the SAN, which *SHOULD* mean COMMON_NAME is ignored.
alt_names = [x509.DNSName(hostname)]
# allow addressing by IP, for when you don't have real DNS (common in most testing scenarios
if ip_addresses:
for addr in ip_addresses:
# openssl wants DNSnames for ips...
alt_names.append(x509.DNSName(addr))
# ... whereas golang's crypto/tls is stricter, and needs IPAddresses
# note: older versions of cryptography do not understand ip_address objects
alt_names.append(x509.IPAddress(ipaddress.ip_address(addr)))
san = x509.SubjectAlternativeName(alt_names)
# path_len=0 means this cert can only sign itself, not other certs.
basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
now = datetime.utcnow()
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.public_key(key.public_key())
.serial_number(1000)
.not_valid_before(now)
.not_valid_after(now + timedelta(days=10*365))
.add_extension(basic_contraints, False)
.add_extension(san, False)
.sign(key, hashes.SHA256(), default_backend())
)
cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
key_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open('cert_key.pem','w') as keyFile:
keyFile.write(key_pem.decode('UTF-8'))
with open('cert_fullchain.pem','w') as fullchainFile:
fullchainFile.write(cert_pem.decode('UTF-8') + key_pem.decode('UTF-8'))
class Callback_LocalHTTPS(BaseHTTPRequestHandler):
def do_GET(self):
print('GET Request received...')
try:
oauthcode = parse_qs(urlparse(self.requestline).query)['code'][0].split(' ')[0]
data={ "grant_type": "authorization_code",
"code": oauthcode,
"redirect_uri": redirect_uri,
"client_id": client_id,
"client_secret": client_secret }
response = requests.post(f"{vsa_uri}/api/v1.0/authorize",data=data)
try:
ktoken = json.loads(response.text)
config['Auth']['access_token'] = ktoken['access_token']
config['Auth']['token_type'] = ktoken['token_type']
config['Auth']['expires_in'] = str(ktoken['expires_in'])
config['Auth']['refresh_token'] = ktoken['refresh_token']
with open(fullpath, 'w') as configfile:
print(f"{config['Auth']['access_token']}")
config.write(configfile)
except Exception as e:
print(f"No Token found... {e}")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"OAuth2 Token generated... you can go back to terminal...")
input("Access Token written on 'config.ini', Press <CTRL>+<C> 2 times to exit")
except:
print("No code found...\n")
return
def doInitialAuth(code, config):
vsa_uri = config['VSA']['vsa_uri']
client_id = config['VSA']['client_id']
client_secret = config['VSA']['client_secret']
authendpoint = vsa_uri + "/api/v1.0/authorize"
redirect_uri = config['Listener']['redirect_uri']
r = requests.post(authendpoint, json={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"client_secret": client_secret})
print("First refresh token:")
print(r.text)
if(r.status_code == 400):
print("An error has occurred")
print(r.text)
exit()
refreshtoken = r.json()['refresh_token']
print("Got token:")
print(refreshtoken)
print("Response code:")
print(str(r.status_code))
config['Auth'] = r.json()
config['Auth']['refreshed_at'] = datetime.datetime.now().strftime("%Y%m%d%H%M")
with open(fullpath, 'w') as configfile:
config.write(configfile)
Auth.doRefresh(refreshtoken)
def getInfobyMail():
msg = MIMEMultipart('mixed')
msg['Subject'] = "PythonVSA Authentication"
msg['From'] = smtp_emailfrom
msg['To'] = smtp_emailto
text = f"""\
Please follow this link to authenticate your new integration: {urlforuser}
Once you have authorized you will be redirected to a page that doesn't load/resolve. Copy the address from your address bar and reply to this email with it."""
msg.attach(MIMEText(text))
print(f"Trying to connect to {urlforuser}...")
smtp_server = smtplib.SMTP(smtp_server, smtp_port)
smtp_server.ehlo()
smtp_server.starttls()
smtp_server.ehlo()
smtp_server.login(smtp_username, smtp_password)
smtp_server.sendmail(smtp_emailfrom, smtp_emailto, msg.as_string())
smtp_server.close()
print(f"Waiting {imap_refresh_interval} seconds to give a chance to respond.")
sleep(imap_refresh_interval)
connection = IMAP4_SSL(imap_server, imap_port)
connection.login(imap_username, imap_password)
typ, data = connection.select('INBOX')
typ, data = connection.search(None, '(UNSEEN)')
if(data == [b'']):
print(f"No emails found. Checking again every {imap_refresh_interval} seconds.")
i = 0
while i < 5:
i = i + 1
print(f"Checking {5 - i} more times.")
sleep(imap_refresh_interval)
typ, data = connection.search(None, '(UNSEEN)')
if(data[0] != b''):
break
for num in data[0].split():
typ, data = connection.fetch(num, '(RFC822)')
try:
msg = email.message_from_bytes(data[1][1])
except(IndexError):
msg = email.message_from_bytes(data[0][1])
typ, data = connection.store(num, '+FLAGS', '\\Seen')
pattern = r'https://.*\/\?code(=[\w\d]{34})'
pattern1 = r'https://.*\/\?code(=[\w\d]{32})'
try:
match = re.match(pattern, msg._payload)
matchraw = re.match(pattern1, msg._payload)
except(TypeError):
print("It appears we have a message with a format we can't understand. Deleting.")
continue
if(match):
code = match.group(1)
code = code.replace("=3D", "")
connection.close()
connection.logout()
doInitialAuth(code, config)
elif(matchraw):
connection.close()
connection.logout()
doInitialAuth(code, config)
else:
print("Didn't find URL. Deleting.")
# from somewhere I forgot and didn't save found on google
def startLocalHTTPSCallback(config):
ssl_key = pathlib.Path("./cert_key.pem").expanduser()
ssl_cert = pathlib.Path("./cert_fullchain.pem").expanduser()
print("Setting Up Local Callback Server...")
httpHandlerFB = SimpleHTTPRequestHandler
# server_address = (listen_ip, server_port)
# httpd = HTTPServer(server_address, httpHandlerFB)
httpd = HTTPServer((config['Listener']['listen_ip'], int(config['Listener']['listen_port'])), Callback_LocalHTTPS)
# I can comment out the following line and it'll work
httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=ssl_key, certfile=ssl_cert, server_side=True)
print(f"Server started at localhost: {config['Listener']['listen_ip']}")
httpd.serve_forever()
if __name__ == "__main__":
#Dev var
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
logging.getLogger().setLevel(logging.DEBUG)
# Init config
config = configparser.ConfigParser()
fullpath = os.getcwd() + "/PythonVSA/config.ini"
config.read(fullpath, encoding='utf-8')
try:
client_id = config['VSA']['client_id']
client_secret = config['VSA']['client_secret']
vsa_uri = config['VSA']['vsa_uri']
redirect_uri = config['Listener']['redirect_uri']
listen_ip = config['Listener']['listen_ip']
listen_port = config['Listener']['listen_port']
smtp_username = config['Email']['smtp_username']
smtp_password = config['Email']['smtp_password']
smtp_emailfrom = config['Email']['smtp_emailfrom']
smtp_emailto = config['Email']['smtp_emailto']
smtp_server = config['Email']['smtp_server']
smtp_port = int(config['Email']['smtp_port'])
imap_username = config['Email']['imap_username']
imap_password = config['Email']['imap_password']
imap_email = config['Email']['imap_email']
imap_server = config['Email']['imap_server']
imap_port = int(config['Email']['imap_port'])
imap_refresh_interval = int(config['Email']['imap_refresh_interval'])
except(KeyError):
print("A required variable is missing from the configuration. Please check config.ini. ")
pass
print(f"Go to your VSA under System > Server Configuration > OAuth Clients and [+Register Client]")
print(f"- Choose anything to your 'Client Name', prefer something that will make sense like PythonVSA")
print(f"- By now, it will work only if you run VSA_Auth.py from your computer, so the 'Redirect_Url' should be https://localhost:8443")
print(f"- For eMail, use something that is working and you have access and configured in the `config.ini`")
input(f"When you are ready, please, press <ENTER>")
urlforuser = vsa_uri + "/vsapres/web20/core/login.aspx?response_type=code&redirect_uri=" + redirect_uri + "&client_id=" + client_id
print("")
print("Now, please visit this link and copy the entire resulting URL.")
print(urlforuser)
# TODO: This this and configure options to use argparse
# getInfobyMail()
# prepare
print("Generating Temporary SSL Certificates for the local server...")
_generateSelfSignedCert('localhost')
startLocalHTTPSCallback(config) |
No description provided.
The text was updated successfully, but these errors were encountered: