Compare commits

..

13 Commits

7 changed files with 1204 additions and 1044 deletions

View File

@ -3,6 +3,37 @@
## Summary ##
This is a python package for working with the Eagle Eye Networks APIv3. It takes some liberties with the API to make it more pythonic. There is plenty of sugar sprinkled in to make it a little easier to use.
## Features ##
This is still a very early project that hasn't reached 1.0.0 yet.
### Included ###
- Login through Oauth2 flow
- Login with refresh_token
- Reseller account switching
- List of Cameras (attached, available, multi)
- List of Bridges
- List of Accounts
- List of Status Events for Cameras
- List of Videos (main, mp4) for Camera
- Get live preview image
- Get list of available feeds
### Coming Soon ###
- Download video (main, mp4)
- Get live video (hls)
- Get Thumbnail image
- Archive (files)
- VSP
- PTZ
- Event Subscriptions
- Alerts
- Notifications
- Tags
## Getting Started ##
@ -34,6 +65,12 @@ Handle the callback from Oauth2 including the code, proceed to get access and re
oauth_object = een.login_tokens(code)
```
Alternatively you can login using the refresh_token
```
een.login_tokens(code=None, cascade=True, refresh_token=<refresh_token>)
```
You'll probably want to get a list of cameras
```

View File

@ -11,4 +11,5 @@ classifiers = ["License :: OSI Approved :: MIT License"]
dynamic = ["version", "description"]
[project.urls]
Home = "https://mcottondesign.com"
Home = "https://mcotton.space"
Source = "https://git.mcotton.space/mcotton/EagleEyev3"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
__version__ = "0.0.25"

267
src/EagleEyev3/camera.py Normal file
View File

@ -0,0 +1,267 @@
import json
import logging
import requests
from datetime import datetime, timedelta
from pytz import timezone
from io import BytesIO
logging.basicConfig(level=logging.INFO)
from EagleEyev3.device import Device
class Camera(Device):
def __init__(self, id=None, name=None, status=dict(), account_id=None, bridge_id=None, user_base_url=None, een_instance=None):
super().__init__(id=id, name=name, status=status, account_id=account_id, user_base_url=user_base_url, een_instance=een_instance)
self.bridge_id = bridge_id
self.previews = []
self.videos = []
self.events = {
'status': [],
'motion': []
}
def get_list_of_events(self, start_timestamp=None, end_timestamp=None):
"""
Obtains the list of events.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
if start_timestamp == None or end_timestamp == None:
logging.warn(f"get_list_of_events called without timestamp")
return {
"success": False,
"response_http_status": None,
"data": { 'msg': 'get_list_of_events called without required args, needs start_timestamp, end_timestamp' }
}
url = f"https://{self.user_base_url}/api/v3.0/events?pageSize=100&include=een.deviceCloudStatusUpdate.v1&startTimestamp__gte={start_timestamp}&endTimestamp__lte={end_timestamp}&actor=camera%3A{self.id}&type__in=een.deviceCloudStatusUpdateEvent.v1"
headers = {
"Authorization": f"Bearer {self.een_instance.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=self.een_instance._get_timeout_values('list_of_events'))
response_json = response.json()
logging.debug(f"{response.status_code} returned from {url} with {headers} and {response.text}")
logging.info(f"{response.status_code} in get_list_of_events")
if response.status_code == 200:
success = True
# filter events by type
[self.events['status'].append(i) for i in response.json()['results'] if i['type'] == 'een.deviceCloudStatusUpdateEvent.v1']
[self.events['motion'].append(i) for i in response.json()['results'] if i['type'] == 'een.motionDetectionEvent.v1']
# remove duplicates
seen = set()
self.events['status'] = [event for event in self.events['status'] if event['id'] not in seen and not seen.add(event['id'])]
seen = set()
self.events['motion'] = [event for event in self.events['motion'] if event['id'] not in seen and not seen.add(event['id'])]
# sort by event startTimestamp descending
self.events['status'] = sorted(self.events['status'], key=lambda x: x['startTimestamp'], reverse=True)
self.events['motion'] = sorted(self.events['motion'], key=lambda x: x['startTimestamp'], reverse=True)
else:
success = False
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for {self.id} get_list_of_events()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_live_preview(self):
url = f"https://{self.user_base_url}/api/v3.0/media/liveImage.jpeg?deviceId={self.id}&type=preview"
headers = {
"Authorization": f"Bearer {self.een_instance.access_token}",
"Accept": "image/jpeg"
}
try:
response = requests.get(url, headers=headers, timeout=self.een_instance._get_timeout_values('live_preview'))
logging.info(f"{response.status_code} in get_live_preview")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for {self.id} get_live_preview()")
response = None
except requests.exceptions.RequestException as e:
logging.warn(e)
response = None
return response
def get_list_of_videos(self, start_timestamp=None, end_timestamp=None, stream_type='main', media_type='video', coalesce='true', include=['mp4Url']):
"""
Obtains the list of videos.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
nextPageToken = None
include_str = ','.join(include)
if start_timestamp == None or end_timestamp == None:
logging.warn(f"get_list_of_videos called without timestamps")
return {
"success": False,
"response_http_status": None,
"data": { 'msg': 'get_list_of_videos called without required args, needs start_timestamp, end_timestamp' }
}
# emulating a do while toop in order to handle pagination, remember to break out of this loop
while True:
if nextPageToken:
url = f"https://{self.user_base_url}/api/v3.0/media?deviceId={self.id}&type={stream_type}&mediaType={media_type}&startTimestamp__gte={start_timestamp}&coalesce={coalesce}&include={include_str}&pageToken={nextPageToken}"
else:
url = f"https://{self.user_base_url}/api/v3.0/media?deviceId={self.id}&type={stream_type}&mediaType={media_type}&startTimestamp__gte={start_timestamp}&coalesce={coalesce}&include={include_str}"
headers = {
"Authorization": f"Bearer {self.een_instance.access_token}",
"Accept": "application/json"
}
response = self.een_instance._make_get_request(url=url, headers=headers, timeout='list_of_videos')
if response:
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_videos")
else:
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
self.videos = [i for i in response_json['results'] if i['startTimestamp'] not in [j['startTimestamp'] for j in self.videos]] + self.videos
# sort by event startTimestamp descending
self.videos = sorted(self.videos, key=lambda x: x['startTimestamp'], reverse=True)
if 'nextPageToken' in response_json and len(response_json['nextPageToken']) > 0:
nextPageToken = response_json['nextPageToken']
else:
break
else:
success = False
break
dups = {}
for video in self.videos:
if video['endTimestamp'] in dups:
dups[video['endTimestamp']] = dups[video['endTimestamp']] + 1
logging.debug(f"found duplicate endTimestamp: { video['endTimestamp'] }")
else:
dups[video['endTimestamp']] = 1
logging.debug(dups)
self.videos = [i for i in self.videos if dups[i['endTimestamp']] == 1]
self.videos = sorted(self.videos, key=lambda x: x['startTimestamp'], reverse=True)
return {
"success": success,
"response_http_status": response.status_code,
"data": self.videos
}
def save_video_to_file(self, url=None, filename=None):
success = False
status_code = None
data = {}
if url == None or filename == None:
logging.warn("save_video_to_file called without url and/or filename")
data = { 'message': 'save_video_to_file called without url and/or filename' }
return {
"success": success,
"response_http_status": status_code,
"data": data
}
headers = {
"Authorization": f"Bearer {self.een_instance.access_token}"
}
try:
with requests.get(url, stream=True, headers=headers, timeout=self.een_instance._get_timeout_values('recorded_video')) as response:
status_code = response.status_code
logging.info(f"{response.status_code} in save_video_to_file")
with open(filename, 'wb') as fname:
for chunk in response.iter_content(chunk_size=8192):
fname.write(chunk)
success = True
data = { 'message': f"video saved as {fname}" }
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for {url} save_video_to_file")
data = { 'message': f"timeout expired for {url} save_video_to_file" }
except requests.exceptions.RequestException as e:
logging.warn(e)
data = { 'message': "Exception {e} in save_video_to_file" }
return {
"success": success,
"response_http_status": status_code,
"data": data
}

65
src/EagleEyev3/device.py Normal file
View File

@ -0,0 +1,65 @@
import json
import logging
import requests
from datetime import datetime, timedelta
from pytz import timezone
from io import BytesIO
logging.basicConfig(level=logging.INFO)
class Device():
def __init__(self, id=None, name=None, status=dict(), account_id=None, user_base_url=None, een_instance=None):
self.id = id
self.name = name
self.status = status
self.account_id = account_id
self.user_base_url = user_base_url,
self.een_instance = een_instance
def get_id(self):
return self.id
def get_status(self):
if 'connectionStatus' in self.status:
return self.status['connectionStatus']
return None
def is_online(self):
if 'connectionStatus' in self.status:
return self.status['connectionStatus'] == "online"
return None
def is_offline(self):
if 'connectionStatus' in self.status:
return self.status['connectionStatus'] != "online"
return None
def __repr__(self):
if self.is_online():
online = ''
elif self.is_offline():
online = ''
else:
online = ''
return f"{online} [{self.id}] - {self.name}"

View File

@ -0,0 +1,828 @@
import json
import logging
import requests
from datetime import datetime, timedelta
from pytz import timezone
from io import BytesIO
logging.basicConfig(level=logging.INFO)
from EagleEyev3.device import Device
from EagleEyev3.camera import Camera
class EagleEyev3():
"""
Class representing the EagleEyev3 client.
"""
from EagleEyev3._version import __version__
__all__ = ['EagleEyev3', 'Device', 'Camera']
def __init__(self, config=None):
"""
Initializes the EagleEyev3 client object.
"""
self.client_id = None
self.client_secret = None
self.access_token = None
self.reseller_access_token = None # this is the reseller user they switched from
self.refresh_token = None
self.redirect_uri = None
keys_to_check = ['client_id', 'client_secret', 'server_protocol', 'server_host', 'server_port', 'server_path']
if config and all([name in config for name in keys_to_check]):
self._load_vars_from_settings(config)
else:
if config is None:
logging.warn("config is None or was not passed into constructor")
else:
logging.error("config is missing keys")
self.user_base_url = None
self.current_user = None
self.users = []
self.bridges = []
self.cameras = []
self.switches = []
self.active_account = None
self.accounts = []
self.user_tz_obj = None
self.lazy_login = False
if self.lazy_login:
try:
self._load_access_token()
except FileNotFoundError as e:
logging.warn("self.lazy_login is set to {self.lazy_login} but could not find .lazy_login file to load")
# for use with request calls, can be an int, tuple, or None
# preferred would be a tuple with the first value is time to connect, second is time to first byte
# https://requests.readthedocs.io/en/latest/user/advanced/#timeouts
# called by self._get_timeout_values
self.timeout_values = {
'default': (3,5),
'login': (5, 30),
'logout': (5, 15),
'list_of_events': (6, 20),
'live_preview': (3, 5),
'switch_account': (5, 30),
'recorded_video': (20, 200) # giving it the best possible chance to succeed
}
def _load_vars_from_settings(self, config={}):
"""
Load variables from the settings module.
"""
self.client_id = config['client_id']
self.client_secret = config['client_secret']
self.server_protocol = config['server_protocol']
self.server_host = config['server_host']
self.server_port = config['server_port']
self.server_path = config['server_path']
# Combine server_protocol, server_host, and server_port to make the redirect_uri
# Note: Please see the note in settings.py about trailing slashes and modify this line if needed
if self.server_port == "" or self.server_port == None:
# if a port isn't specificed than don't prepend it with a colon
self.redirect_uri = f"{self.server_protocol}://{self.server_host}/{self.server_path}"
else:
self.redirect_uri = f"{self.server_protocol}://{self.server_host}:{self.server_port}/{self.server_path}"
def _save_access_token(self):
with open(".lazy_login", "w") as json_file:
json.dump({
'access_token': self.access_token,
'refresh_token': self.refresh_token,
'current_user': self.current_user
}, json_file)
def _load_access_token(self):
with open(".lazy_login", "r") as json_file:
saved = json.load(json_file)
if 'access_token' in saved:
self.access_token = saved['access_token']
if 'refresh_token' in saved:
self.refresh_token = saved['refresh_token']
self.get_base_url(cascade=True)
def time_now(self, escape=True):
if escape:
return requests.utils.quote(datetime.now(tz=self.user_tz_obj).isoformat(timespec='milliseconds'))
else:
return datetime.now(tz=self.user_tz_obj).isoformat(timespec='milliseconds')
def time_before(self, ts=None, hours=6, escape=True):
if ts == None:
ts = datetime.now(tz=self.user_tz_obj)
if type(ts) == str:
ts = datetime.fromisoformat(ts)
if escape:
return requests.utils.quote((ts - timedelta(hours=hours)).isoformat(timespec='milliseconds'))
else:
return (ts - timedelta(hours=hours)).isoformat(timespec='milliseconds')
def login_tokens(self, code=None, cascade=True, refresh_token=None):
"""
Obtains login tokens using the authorization code.
Args:
code (str): The authorization code.
cascade (bool): Indicates whether to cascade and get the base URL and current user information.
Returns:
dict: Dictionary containing the success status, response HTTP status code, data, and current user information.
"""
baseUrl = "https://auth.eagleeyenetworks.com/oauth2/token"
if refresh_token:
pathUrl = f"?grant_type=refresh_token&refresh_token={refresh_token}"
else:
pathUrl = f"?grant_type=authorization_code&scope=vms.all&code={code}&redirect_uri={self.redirect_uri}" # Note the trailing slash, make sure it matches the whitelist
url = baseUrl + pathUrl
# Send a POST request to obtain login tokens
response = requests.post(url, auth=(self.client_id, self.client_secret), timeout=self._get_timeout_values('login'))
response_json = response.json()
logging.info(f"{response.status_code} in login_tokens")
if response.status_code == 200:
success = True
self.access_token = response_json['access_token']
self.refresh_token = response_json['refresh_token']
if self.lazy_login:
self._save_access_token()
if cascade:
self.get_base_url(cascade=cascade)
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json,
'current_user': self.current_user
}
def logout(self):
"""
Revokes token.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = "https://auth.eagleeyenetworks.com/oauth2/revoke"
payload = {
"token": self.access_token,
"token_type_hint": "access_token"
}
headers = {
"Content-type": "application/x-www-form-urlencoded"
}
# Send a POST request to obtain the base URL
response = requests.post(url, auth=(self.client_id, self.client_secret), data=payload, headers=headers, timeout=self._get_timeout_values('logout'))
response_text = response.text
logging.info(f"{response.status_code} in logout")
if response.status_code == 200:
success = True
else:
success = False
logging.info(f"call to logout: {response.status_code}")
self.access_token = None
self.refresh_token = None
return {
"success": success,
"response_http_status": response.status_code,
"data": response_text
}
def login_from_reseller(self, target_account_id=None, cascade=True):
"""
Obtains acces_token for a end-user account.
Args:
code (str): The authorization code.
target_account_id (str): Account ID that you want to get access to.
cascade (bool): Indicates whether to cascade and get the base URL and current user information.
Returns:
dict: Dictionary containing the success status, response HTTP status code, data, and current user information.
"""
# reset all these so they don't cross-accounts
self.users = []
self.bridges = []
self.cameras = []
self.switches = []
url = "https://auth.eagleeyenetworks.com/api/v3.0/authorizationTokens"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
payload = {
"type": "reseller",
"targetType": "account",
"targetId": target_account_id,
"scopes": [
"vms.all"
]
}
# Send a POST request to obtain login tokens
response = requests.post(url, headers=headers, json=payload, timeout=self._get_timeout_values('switch_account'))
response_json = response.json()
logging.info(f"{response.status_code} in login_from_reseller")
if response.status_code == 201: # POST is expected to return a 201
success = True
self.reseller_access_token = self.access_token
self.access_token = response_json['accessToken']
self.active_account = target_account_id
if self.lazy_login:
self._save_access_token()
if cascade:
self.get_base_url(cascade=cascade)
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json,
'current_user': self.current_user
}
def get_base_url(self, cascade=True):
"""
Obtains the base URL for the user.
Args:
cascade (bool): Indicates whether to cascade and get the current user information.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = "https://api.eagleeyenetworks.com/api/v3.0/clientSettings"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
# Send a GET request to obtain the base URL
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('base_url'))
response_json = response.json()
logging.info(f"{response.status_code} in get_base_url")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for {self.id} get_base_url()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
if 'httpsBaseUrl' in response_json and 'hostname' in response_json['httpsBaseUrl']:
self.user_base_url = response_json['httpsBaseUrl']['hostname']
if cascade:
self.get_current_user()
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_current_user(self):
"""
Obtains the information of the current user.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/users/self?include=timeZone"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
# Send a GET request to obtain the current user information
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('curent_user'))
response_json = response.json()
logging.info(f"{response.status_code} in get_current_user")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for get_current_user()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
self.current_user = response_json
self.user_tz_obj = timezone(response_json['timeZone']['timeZone'])
if self.active_account is None:
self.active_account = response_json['accountId']
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_list_of_users(self):
"""
Obtains the list of users.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/users?include=timeZone"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('list_of_users'))
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_users")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for get_list_of_users()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
self.users = [i for i in response_json['results']]
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_list_of_accounts(self):
"""
Obtains the list of accounts.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
nextPageToken = None
# emulating a do while toop in order to handle pagination, remember to break out of this loop
while True:
if nextPageToken:
url = f"https://{self.user_base_url}/api/v3.0/accounts?pageToken={nextPageToken}"
else:
url = f"https://{self.user_base_url}/api/v3.0/accounts"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
response = self._make_get_request(url=url, headers=headers, timeout='list_of_accounts')
if response:
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_accounts")
else:
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
self.accounts = [i for i in response_json['results'] if i not in self.accounts] + self.accounts
if 'nextPageToken' in response_json and len(response_json['nextPageToken']) > 0:
nextPageToken = response_json['nextPageToken']
else:
break
else:
success = False
break
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_list_of_cameras(self):
"""
Obtains the list of cameras.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
includes = ''.join(['status'])
page_size = 1000
nextPageToken = None
# emulating a do while toop in order to handle pagination, remember to break out of this loop
while True:
if nextPageToken:
url = f"https://{self.user_base_url}/api/v3.0/cameras?include={includes}&pageSize={page_size}&pageToken={nextPageToken}"
else:
url = f"https://{self.user_base_url}/api/v3.0/cameras?include={includes}&pageSize={page_size}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
response = self._make_get_request(url=url, headers=headers, timeout='list_of_cameras')
if response:
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_cameras")
else:
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
self.cameras = self.cameras + [
Camera(id=i['id'],\
name=i['name'],\
status=i['status'],\
account_id=i['accountId'],\
bridge_id=i['bridgeId'],\
user_base_url=self.user_base_url,\
een_instance=self)
for i in response_json['results'] if i['id'] not in [j.id for j in self.cameras]]
for camera in self.cameras:
camera.user_base_url = self.user_base_url
if 'nextPageToken' in response_json and len(response_json['nextPageToken']) > 0:
nextPageToken = response_json['nextPageToken']
else:
break
else:
success = False
break
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def _make_get_request(self, url=None, headers={}, timeout='default'):
try:
logging.debug(f"_make_get_request url: {url}")
response = requests.get(url, headers=headers, timeout=self._get_timeout_values(timeout))
return response
except requests.exceptions.Timeout:
logging.warn(f"timeout expired _make_get_request {timeout}")
return None
except requests.exceptions.RequestException as e:
logging.warn(e)
return None
def get_list_of_bridges(self):
"""
Obtains the list of bridges.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/bridges"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('list_of_bridges'))
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_bridges")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for get_list_of_bridges()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
self.bridges = [i for i in response_json['results']]
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_list_of_switches(self):
"""
Obtains the list of switches.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/switches"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('list_of_switches'))
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_switches")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for get_list_of_switches()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
self.switches = [i for i in response_json['results']]
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_list_of_available_devices(self, deviceType__in="camera"):
"""
Obtains the list of available devices.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/availableDevices?deviceType__in={deviceType__in}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('list_of_available_devices'))
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_available_devices")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for get_list_of_available_devices()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_list_of_multi_cameras(self):
"""
Obtains the list of multi-cameras.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/multiCameras"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('list_of_multi_cameras'))
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_multi_cameras")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for get_list_of_multi_cameras()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_list_of_feeds(self):
"""
Obtains the list of feeds.
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/feeds"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('list_of_feeds'))
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_feeds")
except requests.exceptions.Timeout:
logging.warn(f"timeout expired for get_list_of_feeds()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if response.status_code == 200:
success = True
else:
success = False
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def get_camera_by_id(self, esn):
found_camera = None
for camera in self.cameras:
if camera.id == esn:
found_camera = camera
break
if found_camera == None:
camera = Camera()
logging.debug(f"returning camera {camera} for search query {esn}")
return camera
def _get_timeout_values(self, name='default'):
if name in self.timeout_values:
return self.timeout_values[name]
else:
return self.timeout_values['default']