adding pagination to get_list_of_cameras, fixes #2

get_list_of_videos
Mark Cotton 2023-08-14 17:17:57 -05:00
parent 6b2f61f9a3
commit 6c31e8b48d
1 changed files with 65 additions and 38 deletions

View File

@ -1,5 +1,5 @@
""" Python client for Eagle Eye Networks APIv3 """
version = "0.0.10"
version = "0.0.11"
__version__ = version
@ -468,56 +468,83 @@ class EagleEyev3():
Returns:
dict: Dictionary containing the success status, response HTTP status code, and data.
"""
url = f"https://{self.user_base_url}/api/v3.0/cameras?include=status"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
includes = ''.join(['status'])
page_size = 1000
nextPageToken = None
# emulating a do while toop in order to handle pagination, remember to break out of this loop
while True:
if nextPageToken:
url = f"https://{self.user_base_url}/api/v3.0/cameras?include={includes}&pageSize={page_size}&pageToken={nextPageToken}"
else:
url = f"https://{self.user_base_url}/api/v3.0/cameras?include={includes}&pageSize={page_size}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
response = self._make_get_request(url=url, headers=headers, timeout='list_of_cameras')
if response:
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_cameras")
else:
return {
"success": False,
"response_http_status": 0,
"data": None
}
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values('list_of_cameras'))
response_json = response.json()
logging.info(f"{response.status_code} in get_list_of_cameras")
if response.status_code == 200:
success = True
self.cameras = self.cameras + [
Camera(id=i['id'],\
name=i['name'],\
status=i['status'],\
account_id=i['accountId'],\
bridge_id=i['bridgeId'],\
user_base_url=self.user_base_url,\
een_instance=self)
for i in response_json['results'] if i['id'] not in [j.id for j in self.cameras]]
except requests.exceptions.Timeout:
logging.warn(f"timeout expired get_list_of_cameras()")
return {
"success": False,
"response_http_status": 0,
"data": None
}
for camera in self.cameras:
camera.user_base_url = self.user_base_url
except requests.exceptions.RequestException as e:
logging.warn(e)
return {
"success": False,
"response_http_status": 0,
"data": None
}
if 'nextPageToken' in response_json and len(response_json['nextPageToken']) > 0:
nextPageToken = response_json['nextPageToken']
else:
break
if response.status_code == 200:
success = True
self.cameras = [
Camera(id=i['id'],\
name=i['name'],\
status=i['status'],\
account_id=i['accountId'],\
bridge_id=i['bridgeId'],\
user_base_url=self.user_base_url,\
een_instance=self)
for i in response_json['results']]
for camera in self.cameras:
camera.user_base_url = self.user_base_url
else:
success = False
else:
success = False
break
return {
"success": success,
"response_http_status": response.status_code,
"data": response_json
}
def _make_get_request(self, url=None, headers={}, timeout='default'):
try:
response = requests.get(url, headers=headers, timeout=self._get_timeout_values(timeout))
return response
except requests.exceptions.Timeout:
logging.warn(f"timeout expired get_list_of_cameras()")
return None
except requests.exceptions.RequestException as e:
logging.warn(e)
return None
def get_list_of_bridges(self):
"""
Obtains the list of bridges.