import requests import json import glob import sys import os from multiprocessing import Process, Pool import logging logger = logging.getLogger() logger.setLevel('DEBUG') from EagleEyev3 import * from settings import config import sqlite3 con = sqlite3.connect('instance/project.db') cur = con.cursor() def run(args): # iterate through the first argument, appending the second and third to each iteration args = [([i], args[1], args[2]) for i in args[0]] pool.map(download, args, 1) def download(args): # explode arguments into variable names row, een_obj, cam = args row = row[0] camera_device_id = row[2] start = row[3] end = row[4] start = datetime.fromisoformat(start) video_dir = 'videos' if 'video_dir' in config: video_dir = config['video_dir'] path_esn_first = True if 'path_esn_first' in config: path_esn_first = config['path_esn_first'] if path_esn_first: path = f"{video_dir}/{camera_device_id}/{start.year}/{start.month}/{start.day}/" else: path = f"{video_dir}/{camera_device_id}/{start.year}/{start.month}/{start.day}/{camera_device_id}" os.makedirs(path, exist_ok=True) fname = f"{path}/{camera_device_id}_{start}-{end}.mp4" if os.path.isfile(fname) == False: save_result = cam.save_video_to_file(url=row[1], filename=fname) save_code = save_result['response_http_status'] match save_code: case 200: save_status = 'done' case 400 | 404 | 409: save_status = 'client failure' case 401 | 403: save_status = 'auth failure' case 500 | 502 | 503 | 504: save_status = 'cloud failure' case _: save_status = 'unknown failure' else: # file exists, don't do anything and mark as done save_status = 'already exists' save_code = None new_cur = con.cursor() new_cur.execute("update download set status = ?, error = ? where download.id == ?;", (save_status, save_code, row[0])) con.commit() return (save_code, row[0]) if __name__ == '__main__': # the settings object to see how many threads we should run in the pool num_of_threads_in_pool = 4 if 'num_of_threads_in_pool' in config: num_of_threads_in_pool = config['num_of_threads_in_pool'] pool = Pool(num_of_threads_in_pool) print("starting up...") een = EagleEyev3(config) logging.info(f"EagleEyev3 version: {een.__version__}") results = cur.execute('select email from user') list_of_users = [r[0] for r in results] for user_email in list_of_users: if een: result = cur.execute("select user.refresh_token from user where user.email = ?;", (user_email,)) for row in result: een.refresh_token = row[0] else: logging.error('een object is None') een.login_tokens(code=None, cascade=True, refresh_token=een.refresh_token) if een and een.current_user and 'email' in een.current_user: result = cur.execute("select user.refresh_token, user.id from user where user.email = ?;", (een.current_user['email'],)) for row in result: print(f"found user {een.current_user['email']}, updating refresh_token") print(row) print(een.refresh_token) print(f"update user set refresh_token = {een.refresh_token} where id == {row[1]};") cur.execute("update user set refresh_token = ? where id == ?;", (een.refresh_token, row[1])) con.commit() een.get_list_of_cameras() # iterate through all the cameras this user has access to for current_camera in een.cameras: sql = '''SELECT download.id, download.url, camera.device_id, download.start_timestamp, download.end_timestamp, download.status FROM camera JOIN download ON download.camera_id = camera.id JOIN USER ON camera.user_id = user.id WHERE download.status == ? AND camera.device_id == ? ORDER BY download.start_timestamp LIMIT 10000;''' results = cur.execute(sql, ('new', current_camera.id)) # Here is where we send the list of files to be run in the multiprocessing pool run([results, een, current_camera]) else: logging.info(f"failed to login for {user_email}") pool.close() print("Shutting down...")