From 6cab281987f52845f39ebf38036111df82740024 Mon Sep 17 00:00:00 2001 From: Mark Cotton Date: Wed, 19 Jul 2023 23:03:29 -0600 Subject: [PATCH] Creating a package that I can pip install locally, using config object from calling app instead of settings. Version 0.0.4 --- LICENSE | 7 + README.md | 14 +- pyproject.toml | 25 ++ src/EagleEyev3/__init__.py | 651 +++++++++++++++++++++++++++++++++++++ src/EagleEyev3/settings.py | 14 + 5 files changed, 700 insertions(+), 11 deletions(-) create mode 100644 LICENSE create mode 100644 pyproject.toml create mode 100644 src/EagleEyev3/__init__.py create mode 100644 src/EagleEyev3/settings.py diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b6b3d94 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright © 2023 + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index abed330..421ba8c 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,7 @@ -# EagleEyev3 # +# EE-status-v3 # ## Summary ## -This is a python package for working with the Eagle Eye Networks v3 API. It is all about syntatical sugar and making things a little nicer to work with. +This is a python webapp for getting status history for your cameras. ## Settings File ## -There is file `settings.py` that is needed to run. Please reach out to api_support@een.com to get the necessary credentials. - -## Ideas on how to use ## - -I encluded an example Flask server in `server.py`. This shows how to wire it up to a login route and handle the callback. - -You can also use this in a script by running in iPython in interactive mode `ipython -i server.py`. After you complete to login process, you can hit `CTRL-C` to stop the Flask server. You can then look at the `een` object to get all the instance variables. - -There is also a `Playground.ipynb` that you can run in Jupyter Notebook. +There is file `settings.py` that is needed to run. You can create your application and setup credentials at: [https://developerv3.eagleeyenetworks.com/page/my-application-html](my applications). You can also reach out to api_support@een.com for help. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..08ccb51 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools>=61.0.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "EagleEyev3" +version = "0.0.4" +description = "Helper library for Eagle Eye Networks API v3" +readme = "README.md" +authors = [{ name = "Mark Cotton", email = "mcotton@een.com" }] +license = { file = "LICENSE" } +classifiers = [ + "License :: OSI Approved :: MIT License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", +] +keywords = ["EagleEye"] +dependencies = [ + "pytz", + "requests", +] +requires-python = ">=3.6" + +[project.urls] +Homepage = "https://mcotton.space" diff --git a/src/EagleEyev3/__init__.py b/src/EagleEyev3/__init__.py new file mode 100644 index 0000000..f8fc1a6 --- /dev/null +++ b/src/EagleEyev3/__init__.py @@ -0,0 +1,651 @@ +import json +import logging +import requests +from .settings import * + +from datetime import datetime, timedelta +from pytz import timezone + +from io import BytesIO + +logging.basicConfig(level=logging.INFO) + + + +class EagleEyev3(): + """ + Class representing the EagleEyev3 client. + """ + + __version__ = "0.0.4" + __all__ = ['EagleEyev3', 'Device', 'Camera'] + + def __init__(self, config=None): + """ + Initializes the EagleEyev3 client object. + """ + self.client_id = None + self.client_secret = None + self.access_token = None + self.refresh_token = None + self.redirect_uri = None + + keys_to_check = ['client_id', 'client_secret', 'server_protocol', 'server_host', 'server_port', 'server_path'] + if config and all([name in config for name in keys_to_check]): + self._load_vars_from_settings(config) + else: + if config is None: + logging.warn("config is None or was not passed into constructor") + else: + logging.error("config is missing keys") + + self.user_base_url = None + self.current_user = None + self.users = [] + self.bridges = [] + self.cameras = [] + self.switches = [] + self.users = [] + self.accounts = [] + self.user_tz_obj = None + + self.lazy_login = False + + if self.lazy_login: + try: + self._load_access_token() + except FileNotFoundError as e: + logging.warn("self.lazy_login is set to {self.lazy_login} but could not find .lazy_login file to load") + + def _load_vars_from_settings(self, config={}): + """ + Load variables from the settings module. + """ + self.client_id = config['client_id'] + self.client_secret = config['client_secret'] + self.server_protocol = config['server_protocol'] + self.server_host = config['server_host'] + self.server_port = config['server_port'] + self.server_path = config['server_path'] + + # Combine server_protocol, server_host, and server_port to make the redirect_uri + # Note: Please see the note in settings.py about trailing slashes and modify this line if needed + + self.redirect_uri = f"{self.server_protocol}://{self.server_host}:{self.server_port}/{self.server_path}" + + + def _save_access_token(self): + with open(".lazy_login", "w") as json_file: + json.dump({ + 'access_token': self.access_token, + 'refresh_token': self.refresh_token, + 'current_user': self.current_user + }, json_file) + + def _load_access_token(self): + with open(".lazy_login", "r") as json_file: + saved = json.load(json_file) + if 'access_token' in saved: + self.access_token = saved['access_token'] + if 'refresh_token' in saved: + self.refresh_token = saved['refresh_token'] + + self.get_base_url(cascade=True) + + + def time_now(self): + return datetime.now(tz=self.user_tz_obj).isoformat(timespec='milliseconds') + + + def time_before(self, ts=None, hours=6): + if ts == None: + ts = datetime.now(tz=self.user_tz_obj) + + if type(ts) == str: + ts = datetime.fromisoformat(ts) + + return (ts - timedelta(hours=hours)).isoformat(timespec='milliseconds') + + + + def login_tokens(self, code=None, cascade=True): + """ + Obtains login tokens using the authorization code. + + Args: + code (str): The authorization code. + cascade (bool): Indicates whether to cascade and get the base URL and current user information. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, data, and current user information. + """ + baseUrl = "https://auth.eagleeyenetworks.com/oauth2/token" + pathUrl = f"?grant_type=authorization_code&scope=vms.all&code={code}&redirect_uri={self.redirect_uri}" # Note the trailing slash, make sure it matches the whitelist + url = baseUrl + pathUrl + + # Send a POST request to obtain login tokens + response = requests.post(url, auth=(self.client_id, self.client_secret)) + response_json = response.json() + + logging.info(f"{response.status_code} in login_tokens") + + if response.status_code == 200: + success = True + self.access_token = response_json['access_token'] + self.refresh_token = response_json['refresh_token'] + + if self.lazy_login: + self._save_access_token() + + if cascade: + self.get_base_url() + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json, + 'current_user': self.current_user + } + + def logout(self): + """ + Revokes token. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = "https://auth.eagleeyenetworks.com/oauth2/revoke" + + payload = { + "token": self.access_token, + "token_type_hint": "access_token" + } + + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json", + "Content-type": "application/json" + } + + # Send a POST request to obtain the base URL + response = requests.post(url, json=payload, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in logout") + + if response.status_code == 200: + success = True + else: + success = False + logging.info(f"call to logout: {response_json}") + + self.access_token = None + self.refresh_token = None + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_base_url(self, cascade=True): + """ + Obtains the base URL for the user. + + Args: + cascade (bool): Indicates whether to cascade and get the current user information. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = "https://api.eagleeyenetworks.com/api/v3.0/clientSettings" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + # Send a GET request to obtain the base URL + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_base_url") + + if response.status_code == 200: + success = True + if 'httpsBaseUrl' in response_json and 'hostname' in response_json['httpsBaseUrl']: + self.user_base_url = response_json['httpsBaseUrl']['hostname'] + if cascade: + self.get_current_user() + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_current_user(self): + """ + Obtains the information of the current user. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/users/self?include=timeZone" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + # Send a GET request to obtain the current user information + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_current_user") + + if response.status_code == 200: + success = True + self.current_user = response_json + self.user_tz_obj = timezone(response_json['timeZone']['timeZone']) + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_list_of_users(self): + """ + Obtains the list of users. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/users?include=timeZone" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_list_of_users") + + if response.status_code == 200: + success = True + self.users = [i for i in response_json['results']] + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_list_of_cameras(self): + """ + Obtains the list of cameras. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/cameras?include=status" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_list_of_cameras") + + if response.status_code == 200: + success = True + self.cameras = [ + Camera(id=i['id'],\ + name=i['name'],\ + status=i['status'],\ + account_id=i['accountId'],\ + bridge_id=i['bridgeId'],\ + user_base_url=self.user_base_url,\ + een_instance=self) + for i in response_json['results']] + for camera in self.cameras: + camera.user_base_url = self.user_base_url + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_list_of_bridges(self): + """ + Obtains the list of bridges. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/bridges" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_list_of_bridges") + + if response.status_code == 200: + success = True + self.bridges = [i for i in response_json['results']] + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_list_of_switches(self): + """ + Obtains the list of switches. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/switches" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_list_of_switches") + + if response.status_code == 200: + success = True + self.switches = [i for i in response_json['results']] + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_list_of_available_devices(self, deviceType__in="camera"): + """ + Obtains the list of available devices. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/availableDevices?deviceType__in={deviceType__in}" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_list_of_available_devices") + + if response.status_code == 200: + success = True + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_list_of_multi_cameras(self): + """ + Obtains the list of multi-cameras. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/multiCameras" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_list_of_multi_cameras") + + if response.status_code == 200: + success = True + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + def get_list_of_feeds(self): + """ + Obtains the list of feeds. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + url = f"https://{self.user_base_url}/api/v3.0/feeds" + headers = { + "Authorization": f"Bearer {self.access_token}", + "Accept": "application/json" + } + + response = requests.get(url, headers=headers) + response_json = response.json() + + logging.info(f"{response.status_code} in get_list_of_feeds") + + if response.status_code == 200: + success = True + else: + success = False + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + + def get_camera_by_id(self, esn): + found_camera = None + for camera in self.cameras: + if camera.id == esn: + found_camera = camera + break + + if found_camera == None: + camera = Camera() + + logging.debug(f"returning camera {camera} for search query {esn}") + return camera + + + + +class Device(): + + def __init__(self, id=None, name=None, status=None, account_id=None, user_base_url=None, een_instance=None): + + self.id = id + self.name = name + self.status = status + self.account_id = account_id + self.user_base_url = user_base_url, + self.een_instance = een_instance + + def get_id(self): + return self.id + + def get_status(self): + if 'connectionStatus' in self.status: + return self.status['connectionStatus'] + return None + + def is_online(self): + if 'connectionStatus' in self.status: + return self.status['connectionStatus'] == "online" + return None + + def is_offline(self): + if 'connectionStatus' in self.status: + return self.status['connectionStatus'] == "deviceOffline" \ + or self.status['connectionStatus'] == "bridgeOffline" \ + or self.status['connectionStatus'] == "error" + return None + + def __repr__(self): + if self.is_online(): + online = '✅' + elif self.is_offline(): + online = '❌' + else: + online = '?' + return f"{online} [{self.id}] - {self.name}" + + + +class Camera(Device): + + def __init__(self, id=None, name=None, status=None, account_id=None, bridge_id=None, user_base_url=None, een_instance=None): + super().__init__(id=id, name=name, status=status, account_id=account_id, user_base_url=user_base_url, een_instance=een_instance) + self.bridge_id = bridge_id + self.previews = [] + self.videos = [] + self.events = { + 'status': [], + 'motion': [] + } + + + def get_list_of_events(self, start_timestamp=None, end_timestamp=None): + """ + Obtains the list of events. + + Returns: + dict: Dictionary containing the success status, response HTTP status code, and data. + """ + + if start_timestamp == None or end_timestamp == None: + logging.debug(f"get_list_of_events called without timestamp") + return { + "success": False, + "response_http_status": None, + "data": { 'msg': 'get_list_of_events called without required args, needs start_timestamp, end_timestamp' } + } + + url = f"https://{self.user_base_url}/api/v3.0/events?pageSize=100&include=een.deviceCloudStatusUpdate.v1&startTimestamp__gte={start_timestamp}&endTimestamp__lte={end_timestamp}&actor=camera%3A{self.id}&type__in=een.deviceCloudStatusUpdateEvent.v1" + + headers = { + "Authorization": f"Bearer {self.een_instance.access_token}", + "Accept": "application/json" + } + + try: + response = requests.get(url, headers=headers, timeout=(3, 5)) + response_json = response.json() + + logging.debug(f"{response.status_code} returned from {url} with {headers} and {response.text}") + logging.info(f"{response.status_code} in get_list_of_events") + + if response.status_code == 200: + success = True + # filter events by type + [self.events['status'].append(i) for i in response.json()['results'] if i['type'] == 'een.deviceCloudStatusUpdateEvent.v1'] + [self.events['motion'].append(i) for i in response.json()['results'] if i['type'] == 'een.motionDetectionEvent.v1'] + + # remove duplicates + seen = set() + self.events['status'] = [event for event in self.events['status'] if event['endTimestamp'] and event['id'] not in seen and not seen.add(event['id'])] + seen = set() + self.events['motion'] = [event for event in self.events['motion'] if event['id'] not in seen and not seen.add(event['id'])] + + # sort by event startTimestamp descending + self.events['status'] = sorted(self.events['status'], key=lambda x: x['startTimestamp'], reverse=True) + self.events['motion'] = sorted(self.events['motion'], key=lambda x: x['startTimestamp'], reverse=True) + else: + success = False + + except requests.exceptions.Timeout: + logging.warn(f"timeout expired for {self.id} get_list_of_events()") + return { + "success": False, + "response_http_status": 0, + "data": None + } + + except requests.exceptions.RequestException as e: + logging.warn(e) + return { + "success": False, + "response_http_status": 0, + "data": None + } + + + return { + "success": success, + "response_http_status": response.status_code, + "data": response_json + } + + + def get_live_preview(self): + + url = f"https://{self.user_base_url}/api/v3.0/media/liveImage.jpeg?deviceId={self.id}&type=preview" + + headers = { + "Authorization": f"Bearer {self.een_instance.access_token}", + "Accept": "image/jpeg" + } + + try: + response = requests.get(url, headers=headers, timeout=(3, 5)) + logging.info(f"{response.status_code} in get_live_preview") + + except requests.exceptions.Timeout: + logging.warn(f"timeout expired for {self.id} get_live_preview()") + response = None + except requests.exceptions.RequestException as e: + logging.warn(e) + response = None + + return response + + + + + + + + diff --git a/src/EagleEyev3/settings.py b/src/EagleEyev3/settings.py new file mode 100644 index 0000000..7a35d6d --- /dev/null +++ b/src/EagleEyev3/settings.py @@ -0,0 +1,14 @@ + +# Set up your application and get client id/secrete first +# https://developerv3.eagleeyenetworks.com/page/my-application +client_id = "" +client_secret = "" + +# you will need to add approved redirect_uris in your application +# this examples assumes you've added http://127.0.0.1:3333/login_callback +# change the following variables if you did something different +# Note: do not use localhost for server_host, use 127.0.0.1 instead +server_protocol = "http" +server_host = "127.0.0.1" +server_port = "3333" +server_path = "login_callback"