EE-status-v3/EagleEyev3/__init__.py

141 lines
4.8 KiB
Python
Raw Normal View History

2023-05-18 15:40:02 +00:00
import json
import logging
import requests
from flask import Flask, request
from .settings import *
logging.basicConfig(level=logging.INFO)
class EagleEyev3():
def __init__(self):
self.client_id = None
self.client_secret = None
self.access_token = None
self.refresh_token = None
self.redirect_uri = None
self._load_vars_from_settings()
self.user_base_url = None
self.current_user = None
self.users = []
self.bridges = []
self.cameras = []
self.users = []
self.accounts = []
self.base_url = None
def _load_vars_from_settings(self):
2023-05-18 15:43:13 +00:00
"""Load variables from the settings module."""
2023-05-18 15:40:02 +00:00
self.client_id = settings.client_id
self.client_secret = settings.client_secret
self.server_host = settings.server_host
self.server_port = settings.server_port
2023-05-18 15:43:13 +00:00
# Combine server_protocol, server_host, and server_port to make the redirect_uri
# Note: Please see the note in settings.py about trailing slashes and modify this line if needed
2023-05-18 15:40:02 +00:00
self.redirect_uri = f"{settings.server_protocol}://{settings.server_host}:{settings.server_port}"
def login_tokens(self, code=None, cascade=True):
2023-05-18 15:43:13 +00:00
"""Obtains login tokens using the authorization code.
Args:
code (str): The authorization code.
cascade (bool): Indicates whether to cascade and get the base URL and current user information.
Returns:
dict: Dictionary containing the success status, response HTTP status code, data, and current user information.
"""
2023-05-18 15:40:02 +00:00
baseUrl = "https://auth.eagleeyenetworks.com/oauth2/token"
2023-05-18 15:43:13 +00:00
pathUrl = f"?grant_type=authorization_code&scope=vms.all&code={code}&redirect_uri={self.redirect_uri}" # Note the trailing slash, make sure it matches the whitelist
2023-05-18 15:40:02 +00:00
url = baseUrl + pathUrl
2023-05-18 15:43:13 +00:00
# Send a POST request to obtain login tokens
2023-05-18 15:40:02 +00:00
response = requests.post(url, auth=(self.client_id, self.client_secret))
response_json = response.json()
logging.info(f"{response.status_code} in login_tokens")
if response.status_code == 200:
success = True
self.access_token = response_json['access_token']
self.refresh_token = response_json['refresh_token']
if cascade:
self.get_base_url()
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json,
'current_user': self.current_user
}
def get_base_url(self, cascade=True):
2023-05-18 15:43:13 +00:00
"""Obtains the base URL for the user.
Args:
cascade (bool): Indicates whether to cascade and get the current user information.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
2023-05-18 15:40:02 +00:00
url = "https://api.eagleeyenetworks.com/api/v3.0/clientSettings"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
2023-05-18 15:43:13 +00:00
# Send a GET request to obtain the base URL
2023-05-18 15:40:02 +00:00
response = requests.get(url, headers=headers)
response_json = response.json()
logging.info(f"{response.status_code} in get_base_url")
2023-05-18 15:43:13 +00:00
2023-05-18 15:40:02 +00:00
if response.status_code == 200:
success = True
if 'httpsBaseUrl' in response_json and 'hostname' in response_json['httpsBaseUrl']:
self.user_base_url = response_json['httpsBaseUrl']['hostname']
if cascade:
self.get_current_user()
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_current_user(self):
2023-05-18 15:43:13 +00:00
"""Obtains the information of the current user.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
2023-05-18 15:40:02 +00:00
url = f"https://{self.user_base_url}/api/v3.0/users/self"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
2023-05-18 15:43:13 +00:00
# Send a GET request to obtain the current user information
2023-05-18 15:40:02 +00:00
response = requests.get(url, headers=headers)
response_json = response.json()
logging.info(f"{response.status_code} in get_current_user")
2023-05-18 15:43:13 +00:00
2023-05-18 15:40:02 +00:00
if response.status_code == 200:
success = True
self.current_user = response_json
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}