import datetime import time import json import requests from selenium.webdriver.support import expected_conditions as EC from selenium import webdriver from selenium.webdriver import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.chrome.options import Options class flight_list(object): def __init__(self): self.url = r"http://me.sichuanair.com/login.shtml" self.login_url = "https://login.sichuanair.com/idp/AuthnEngine?currentAuth=urn_oasis_names_tc_SAML_2.0_ac_classes_BAMUsernamePassword" self.flight_list_url = "https://me.sichuanair.com/api/v1/plugins/LM_FLIGHT_LIST" self.flight_list_third_url = "https://me.sichuanair.com/api/v1/plugins/LM_FLIGHT_THIRD_LIST" self.task_flight_list_url="https://me.sichuanair.com/api/v1/plugins/LM_TASK_ASSIGNMENT_LIST" self.LM_FJ_TASK_PG="https://me.sichuanair.com/api/v1/plugins/LM_FJ_TASK_PG" self.task_filght_third_list_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TASK_ASSIGNMENT_THIRD_LIST" self.LM_TSK_DINGDONG_url ="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_DINGDONG" self.LM_TSK_SURE_PG_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_SURE_PG" self.LM_TSK_EMP_PGLIST_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_EMP_PGLIST" self.MM_GJJH_LIST="https://cscsupplier.sichuanair.com/api/v1/plugins/MM_GJJH_LIST" self.LM_TSK_HANDOVER_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_HANDOVER" self.LM_FLIGHT_SEARCH_LIST='https://cscsupplier.sichuanair.com/api/v1/plugins/LM_FLIGHT_SEARCH_LIST' self.LM_TASK_ARCHIVE_LIST='https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TASK_ARCHIVE_LIST' self.jobcard='https://cscsupplier.sichuanair.com' self.token = None #amro状态 self.amro_status = 0 self.flight_list_json = None self.flight_list_third_json = None def start(self, username, password): try: self.token = self.login(username, password) except Exception: pass return self.token def login(self, username, password): try: options = webdriver.EdgeOptions() options.use_chromium = True #options.add_experimental_option('excludeSwitches', ['enable-logging']) #options.add_argument("headless") #options.add_argument("disable-gpu") #options.add_argument('start-maximized') #options.add_argument('window-size=1920x1080') self.driver = webdriver.Edge(options=options) self.actions = ActionChains(self.driver) except: chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("window-size=1920x1080") chrome_options.add_argument("--start-maximized") chrome_options.add_argument('--disable-gpu') # 如果不加这个选项,有时定位会出现问题 self.driver = webdriver.Chrome(executable_path=r'D:\flightinfo\Google\Chrome\Application\chromedriver.exe', # executable_path=path options=chrome_options) self.actions = ActionChains(self.driver) self.driver.get(self.url) WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'singleSubmit'))) loginbtn = self.driver.find_element('id', 'singleSubmit') time.sleep(0.5) self.actions.click(loginbtn) self.actions.perform() WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.NAME, "j_username"))).send_keys( username) time.sleep(0.5) self.driver.find_element('name', "j_password").send_keys(password) #time.sleep(0.5) time.sleep(0.5) self.driver.find_element_by_xpath("//button[@type='button']").click() time.sleep(1) for cookie in self.driver.get_cookies(): if "_amro_sk" in cookie.values(): self.amro_status = 1 self.driver.quit() return "_amro_sk=" +cookie["value"] else: pass self.driver.quit() def get_cookie(self): if self.token!=None: return self.token else: return None def checkCookieSts(self,cookie): url = "https://me.sichuanair.com/api/v1/plugins/PROCESS_CLAIM_TASK_PRO_LIST" data = { 'user_id': '', 'userId': '', 'accountType': 'ARCHIVE', 'page': '1', 'rows': '11' } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} result = requests.post(url, data, headers=header).json() #print(result['code'], result['msg'], result['data']) return result['code'] def checkWorkjob(self,taskids,acno,taskType,actype,startDate,endDate,cookie): data = {"taskids": taskids, "acno": acno, "taskType": taskType, "actype": actype, "startDate": startDate, "endDate": endDate } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.LM_FJ_TASK_PG_json = requests.post(url=self.LM_FJ_TASK_PG, data=data, headers=header).json() except Exception: self.LM_FJ_TASK_PG_json = None return self.LM_FJ_TASK_PG_json def request_jobcard(self, cookie, filename): # 需要自动逻辑白班夜班 one_year_later = datetime.datetime.now() + datetime.timedelta(days=365) timestamp = int(one_year_later.timestamp()*1000) header = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Cookie": cookie} try: task_jobcard_json = requests.get(url=f'https://cscsupplier.sichuanair.com{filename}?_cache_timestamp={timestamp}',headers=header).content except Exception: task_jobcard_json = None return task_jobcard_json def request_MM_GJJH_LIST(self,cookie): yesterday=(datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") today = datetime.date.today().strftime("%Y-%m-%d") data={"mfrpn":"", "zjcdats": yesterday, "zjcdate": today, "zwgh": "Y", "zlynam":"", "zbmtxt":"", "zkstxt":"", "zfdtxt":"", "zlgort": "TF20", "zghnam":"", "ghrbm":"", "ghrks":"", "ghrfd":"", "zghkcd":"", "sort": "ZBMTXT", "order": "asc" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.MM_GJJH_LIST_json = requests.post(url=self.MM_GJJH_LIST, data=data, headers=header).json() except Exception: self.MM_GJJH_LIST_json = None return self.MM_GJJH_LIST_json def request_task_filght_list(self, flightDate:str,cookie): #需要自动逻辑白班夜班 data = {"airportCode": "ZUTF", "ddate": flightDate, "notView":"", "baseCode": "TF01", "actype1":"(A319|A320|A321)", "aclocArea1": "()", "tasktype1": "()", "shift":"", "tasktype":"", "actype": "A319,A320,A321", "aclocArea":"", "isshiftex":"", "repush":"", "page": "1", "rows": "500" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.task_flight_list_json = requests.post(url=self.task_flight_list_url, data=data, headers=header).json() except Exception: self.task_flight_list_json = None #with open('./temp/task_flight_list_json/fl{}.json'.format(int(time.time())), 'w') as f: #json.dump(self.task_flight_list_json, f, indent=4) return self.task_flight_list_json def request_task_filght_third_list(self, flightDate:str, cookie): #需要自动逻辑白班夜班 data = {"airportCode": "ZUTF", "ddate": flightDate, "notView":"", "baseCode": "TF01", "isThird": "Y", "actype1":"", "aclocArea1": "()", "tasktype1": "()", "shift":"", "tasktype":"", "actype": "(A319|A320|A321|B737NG|B737MAX|C919)", "aclocArea":"", "isshiftex":"", "repush":"", "page": "1", "rows": "500" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.task_filght_third_list = requests.post(url=self.task_filght_third_list_url, data=data, headers=header).json() except Exception: self.task_filght_third_list = None #with open('./temp/task_filght_third_list/fl{}.json'.format(int(time.time())), 'w') as f: # json.dump(self.task_filght_third_list, f, indent=4) return self.task_filght_third_list def request_LM_TASK_ARCHIVE_LIST(self, flightDate:str, cookie): #需要自动逻辑白班夜班 startdatePlStart=flightDate+" 00:00:00" flightDate2=(datetime.datetime.strptime(flightDate, "%Y%m%d")+datetime.timedelta(days=1)).strftime("%Y%m%d") startdatePlEnd=flightDate2+" 09:00:00" data = {"startdatePlStart": startdatePlStart, "startdatePlEnd": startdatePlEnd, "airportCode":'ZUTF', "taskTypeDefault": "LM", "actype1": '(A319|A320|A321)', "actype": "A319,A320,A321", "baseCode": "TF01", "page":"1", "rows": "500" } header = {"Accept": "application/json, text/javascript, */*; q=0.01","Cookie": cookie} try: self.task_LM_TASK_ARCHIVE_LIST = requests.post(url=self.LM_TASK_ARCHIVE_LIST, data=data, headers=header).json() except Exception: self.task_LM_TASK_ARCHIVE_LIST = None #with open('./temp/task_LM_FLIGHT_SEARCH_LIST/fl{}.json'.format(int(time.time())), 'w') as f: # json.dump(self.task_LM_FLIGHT_SEARCH_LIST, f, indent=4) return self.task_LM_TASK_ARCHIVE_LIST def request_LM_FLIGHT_SEARCH_LIST(self, flightDate:str, cookie): #需要自动逻辑白班夜班 data = {"base4code": "ZUTF", "flightDate": flightDate, "flightDate1":flightDate, "jcType": "", "actype1": '(A319|A320|A321)', "dep_4code": "", "arr_4code": "", "acno":'', "actype": "A319,A320,A321", "flightNo":"", "notView":"", "page":"1", "onlyAf":"", "rows": "500" } header = {"Accept": "application/json, text/javascript, */*; q=0.01","Cookie": cookie} try: self.task_LM_FLIGHT_SEARCH_LIST = requests.post(url=self.LM_FLIGHT_SEARCH_LIST, data=data, headers=header).json() except Exception: self.task_LM_FLIGHT_SEARCH_LIST = None #with open('./temp/task_LM_FLIGHT_SEARCH_LIST/fl{}.json'.format(int(time.time())), 'w') as f: # json.dump(self.task_LM_FLIGHT_SEARCH_LIST, f, indent=4) return self.task_LM_FLIGHT_SEARCH_LIST def request_filght_list(self, flightDate:str, cookie): data = {"base4code": "ZUTF", "flightDate": flightDate,#2023-06-02 "tasktype": "", "notView": "", "jcType": "", "actype1": "(A319|A320|A321)", "actype": "A319,A320,A321", "acno": "", "dep_4code": "", "arr_4code": "", "flightNo": "", "repush": "" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.flight_list_json = requests.post(url=self.flight_list_url, data=data, headers=header).json() except Exception: self.flight_list_json = None #with open('./temp/filght_list/fl{}.json'.format(int(time.time())), 'w') as f: # json.dump(self.flight_list_json, f, indent=4) return self.flight_list_json def request_LM_TSK_HANDOVER(self,dict:dict,cookie): data = { "taskids": dict["taskid"], "tasksts": dict["sts"], # 必须,可能需要修改,不知道影响不 "FunctionCode": "LM_TSK_HANDOVER" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.LM_TSK_HANDOVER = requests.post(url=self.LM_TSK_HANDOVER_url, data=data, headers=header).json() #print(self.LM_TSK_HANDOVER) except Exception: self.LM_TSK_HANDOVER = None return self.LM_TSK_HANDOVER def request_LM_TSK_DINGDONG(self,dict:dict,cookie): data = {"tasksts": "1", # 必须 "taskid": "{}".format(dict["taskid"]), "acno": dict["acno"], "actype": dict["actype"], "tasktype": dict["tasktype"], "tatd": dict["tatd"], "msgInfo": dict["msgInfo"], "bay": dict["bay"], "wxemp": dict["wxemp"], # 必须dict["wxemp"] "fxemp": dict["fxemp"], "wx": dict["wx"], "fx": dict["fx"], "ecsj": dict["ecsj"], "ecsjEmp": "", "FunctionCode": "LM_TSK_DINGDONG" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.LM_TSK_DINGDONG = requests.post(url=self.LM_TSK_DINGDONG_url, data=data, headers=header).json() except Exception: self.LM_TSK_DINGDONG = None return self.LM_TSK_DINGDONG def request_flight_third_list(self, flightDate: str, cookie): data = {"base4code": "ZUTF", "flightDate": flightDate, "flightDateStart": flightDate, "tasktype": "", "notView": "", "isThird":"Y", "actype1": "(21N|319|320|321|32N|332|738|73G|A21N|A319|A319|A320|A321|A32N|A738|A73G|AZZZ|B737|C919|ARJ21)", "actype": "B737NG,B737MAX,A319,A320,A321,C919,ARJ21" #"actype": "A319%2CA320%2CA321%2CB737MAX%2CB737NG" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.flight_third_list = requests.post(url=self.flight_list_third_url, data=data, headers=header).json() except Exception: self.flight_third_list = None #with open('./temp/filght_list/fl{}.json'.format(int(time.time())), 'w') as f: # json.dump(self.flight_third_list, f, indent=4) return self.flight_third_list def request_LM_TSK_SURE_PG(self,dict:dict,cookie): empNos = dict["empNos"] taskid = dict["taskid"] type = dict["type"] shiftDate = dict["shiftDate"] shift = dict["shift"] data = {"empNos": empNos, "taskid": taskid, "type": type, #ECSJ 二次送机 WX FX "shiftDate": shiftDate, #夜班航前需要前一天 "shift": shift, "FunctionCode": "LM_TSK_SURE_PG" } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: #print(data) self.LM_TSK_SURE_PG = requests.post(url=self.LM_TSK_SURE_PG_url, data=data, headers=header).json() #print(self.LM_TSK_SURE_PG ) except Exception: self.LM_TSK_SURE_PG = None return self.LM_TSK_SURE_PG def request_LM_TSK_EMP_PGLIST(self,taskid:str,shiftDate:str,shift:str,cookie): data = {"flightDate": shiftDate, "shift": shift, "type": "WX", "taskid":taskid, "wx": "wx", "baseCode": "TF01", #EMP_NO,NAME,MAINLY_ROLE "deptNo": "", "fx": "fx", } header = {"Accept": "application/json, text/javascript, */*; q=0.01", "Cookie": cookie} try: self.LM_TSK_EMP_PGLIST = requests.post(url=self.LM_TSK_EMP_PGLIST_url, data=data, headers=header).json() except Exception: self.LM_TSK_EMP_PGLIST = None #with open('./temp/LM_TSK_EMP_PGLIST/fl{}.json'.format(int(time.time())), 'w') as f: # json.dump(self.LM_TSK_EMP_PGLIST, f, indent=4) return self.LM_TSK_EMP_PGLIST if __name__ == '__main__': fl = flight_list() #fl.start() nowDay = datetime.datetime.now().replace(microsecond=0) nowDayStr = nowDay.strftime("%Y-%m-%d")