123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- import datetime
- import time
- import json
- import requests
- from selenium.webdriver.support import expected_conditions as EC
- from selenium import webdriver
- from selenium.webdriver import ActionChains
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.chrome.options import Options
- class flight_list(object):
- def __init__(self):
- self.url = r"http://me.sichuanair.com/login.shtml"
- self.login_url = "https://login.sichuanair.com/idp/AuthnEngine?currentAuth=urn_oasis_names_tc_SAML_2.0_ac_classes_BAMUsernamePassword"
- self.flight_list_url = "https://me.sichuanair.com/api/v1/plugins/LM_FLIGHT_LIST"
- self.flight_list_third_url = "https://me.sichuanair.com/api/v1/plugins/LM_FLIGHT_THIRD_LIST"
- self.task_flight_list_url="https://me.sichuanair.com/api/v1/plugins/LM_TASK_ASSIGNMENT_LIST"
- self.LM_FJ_TASK_PG="https://me.sichuanair.com/api/v1/plugins/LM_FJ_TASK_PG"
- self.task_filght_third_list_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TASK_ASSIGNMENT_THIRD_LIST"
- self.LM_TSK_DINGDONG_url ="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_DINGDONG"
- self.LM_TSK_SURE_PG_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_SURE_PG"
- self.LM_TSK_EMP_PGLIST_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_EMP_PGLIST"
- self.MM_GJJH_LIST="https://cscsupplier.sichuanair.com/api/v1/plugins/MM_GJJH_LIST"
- self.LM_TSK_HANDOVER_url="https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TSK_HANDOVER"
- self.LM_FLIGHT_SEARCH_LIST='https://cscsupplier.sichuanair.com/api/v1/plugins/LM_FLIGHT_SEARCH_LIST'
- self.LM_TASK_ARCHIVE_LIST='https://cscsupplier.sichuanair.com/api/v1/plugins/LM_TASK_ARCHIVE_LIST'
- self.jobcard='https://cscsupplier.sichuanair.com'
- self.token = None
- #amro状态
- self.amro_status = 0
- self.flight_list_json = None
- self.flight_list_third_json = None
- def start(self, username, password):
- try:
- self.token = self.login(username, password)
- except Exception:
- pass
- return self.token
- def login(self, username, password):
- try:
- options = webdriver.EdgeOptions()
- options.use_chromium = True
- #options.add_experimental_option('excludeSwitches', ['enable-logging'])
- #options.add_argument("headless")
- #options.add_argument("disable-gpu")
- #options.add_argument('start-maximized')
- #options.add_argument('window-size=1920x1080')
- self.driver = webdriver.Edge(options=options)
- self.actions = ActionChains(self.driver)
- except:
- chrome_options = Options()
- chrome_options.add_argument("--headless")
- chrome_options.add_argument("window-size=1920x1080")
- chrome_options.add_argument("--start-maximized")
- chrome_options.add_argument('--disable-gpu') # 如果不加这个选项,有时定位会出现问题
- self.driver = webdriver.Chrome(executable_path=r'D:\flightinfo\Google\Chrome\Application\chromedriver.exe',
- # executable_path=path
- options=chrome_options)
- self.actions = ActionChains(self.driver)
- self.driver.get(self.url)
- WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'singleSubmit')))
- loginbtn = self.driver.find_element('id', 'singleSubmit')
- time.sleep(0.5)
- self.actions.click(loginbtn)
- self.actions.perform()
- WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.NAME, "j_username"))).send_keys(
- username)
- time.sleep(0.5)
- self.driver.find_element('name', "j_password").send_keys(password)
- #time.sleep(0.5)
- time.sleep(0.5)
- self.driver.find_element_by_xpath("//button[@type='button']").click()
- time.sleep(1)
- for cookie in self.driver.get_cookies():
- if "_amro_sk" in cookie.values():
- self.amro_status = 1
- self.driver.quit()
- return "_amro_sk=" +cookie["value"]
- else:
- pass
- self.driver.quit()
- def get_cookie(self):
- if self.token!=None:
- return self.token
- else:
- return None
- def checkCookieSts(self,cookie):
- url = "https://me.sichuanair.com/api/v1/plugins/PROCESS_CLAIM_TASK_PRO_LIST"
- data = {
- 'user_id': '',
- 'userId': '',
- 'accountType': 'ARCHIVE',
- 'page': '1',
- 'rows': '11'
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- result = requests.post(url, data, headers=header).json()
- #print(result['code'], result['msg'], result['data'])
- return result['code']
- def checkWorkjob(self,taskids,acno,taskType,actype,startDate,endDate,cookie):
- data = {"taskids": taskids,
- "acno": acno,
- "taskType": taskType,
- "actype": actype,
- "startDate": startDate,
- "endDate": endDate
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.LM_FJ_TASK_PG_json = requests.post(url=self.LM_FJ_TASK_PG, data=data, headers=header).json()
- except Exception:
- self.LM_FJ_TASK_PG_json = None
- return self.LM_FJ_TASK_PG_json
- def request_jobcard(self, cookie, filename): # 需要自动逻辑白班夜班
- one_year_later = datetime.datetime.now() + datetime.timedelta(days=365)
- timestamp = int(one_year_later.timestamp()*1000)
- header = {
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "Cookie": cookie}
- try:
- task_jobcard_json = requests.get(url=f'https://cscsupplier.sichuanair.com{filename}?_cache_timestamp={timestamp}',headers=header).content
- except Exception:
- task_jobcard_json = None
- return task_jobcard_json
- def request_MM_GJJH_LIST(self,cookie):
- yesterday=(datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
- today = datetime.date.today().strftime("%Y-%m-%d")
- data={"mfrpn":"",
- "zjcdats": yesterday,
- "zjcdate": today,
- "zwgh": "Y",
- "zlynam":"",
- "zbmtxt":"",
- "zkstxt":"",
- "zfdtxt":"",
- "zlgort": "TF20",
- "zghnam":"",
- "ghrbm":"",
- "ghrks":"",
- "ghrfd":"",
- "zghkcd":"",
- "sort": "ZBMTXT",
- "order": "asc"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.MM_GJJH_LIST_json = requests.post(url=self.MM_GJJH_LIST, data=data, headers=header).json()
- except Exception:
- self.MM_GJJH_LIST_json = None
- return self.MM_GJJH_LIST_json
- def request_task_filght_list(self, flightDate:str,cookie,bc:str): #需要自动逻辑白班夜班
- data = {"airportCode": "ZUTF",
- "ddate": flightDate,
- "notView":"",
- "baseCode": "TF01",
- "actype1":"(A319|A320|A321)",
- "aclocArea1": "()",
- "tasktype1": "()",
- "shift":bc,
- "tasktype":"",
- "actype": "A319,A320,A321",
- "aclocArea":"",
- "isshiftex":"",
- "repush":"",
- "page": "1",
- "rows": "500"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.task_flight_list_json = requests.post(url=self.task_flight_list_url, data=data, headers=header).json()
- except Exception:
- self.task_flight_list_json = None
- #with open('./temp/task_flight_list_json/fl{}.json'.format(int(time.time())), 'w') as f:
- #json.dump(self.task_flight_list_json, f, indent=4)
- return self.task_flight_list_json
- def request_task_filght_third_list(self, flightDate:str, cookie,bc:str): #需要自动逻辑白班夜班
- data = {"airportCode": "ZUTF",
- "ddate": flightDate,
- "notView":"",
- "baseCode": "TF01",
- "isThird": "Y",
- "actype1":"",
- "aclocArea1": "()",
- "tasktype1": "()",
- "shift":bc,
- "tasktype":"",
- "actype": "(A319|A320|A321|B737NG|B737MAX|C919)",
- "aclocArea":"",
- "isshiftex":"",
- "repush":"",
- "page": "1",
- "rows": "500"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.task_filght_third_list = requests.post(url=self.task_filght_third_list_url, data=data, headers=header).json()
- except Exception:
- self.task_filght_third_list = None
- #with open('./temp/task_filght_third_list/fl{}.json'.format(int(time.time())), 'w') as f:
- # json.dump(self.task_filght_third_list, f, indent=4)
- return self.task_filght_third_list
- def request_LM_TASK_ARCHIVE_LIST(self, flightDate:str, cookie): #需要自动逻辑白班夜班
- startdatePlStart=flightDate+" 00:00:00"
- flightDate2=(datetime.datetime.strptime(flightDate, "%Y%m%d")+datetime.timedelta(days=1)).strftime("%Y%m%d")
- startdatePlEnd=flightDate2+" 09:00:00"
- data = {"startdatePlStart": startdatePlStart,
- "startdatePlEnd": startdatePlEnd,
- "airportCode":'ZUTF',
- "taskTypeDefault": "LM",
- "actype1": '(A319|A320|A321)',
- "actype": "A319,A320,A321",
- "baseCode": "TF01",
- "page":"1",
- "rows": "500"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01","Cookie": cookie}
- try:
- self.task_LM_TASK_ARCHIVE_LIST = requests.post(url=self.LM_TASK_ARCHIVE_LIST, data=data, headers=header).json()
- except Exception:
- self.task_LM_TASK_ARCHIVE_LIST = None
- #with open('./temp/task_LM_FLIGHT_SEARCH_LIST/fl{}.json'.format(int(time.time())), 'w') as f:
- # json.dump(self.task_LM_FLIGHT_SEARCH_LIST, f, indent=4)
- return self.task_LM_TASK_ARCHIVE_LIST
- def request_LM_FLIGHT_SEARCH_LIST(self, flightDate:str, cookie): #需要自动逻辑白班夜班
- data = {"base4code": "ZUTF",
- "flightDate": flightDate,
- "flightDate1":flightDate,
- "jcType": "",
- "actype1": '(A319|A320|A321)',
- "dep_4code": "",
- "arr_4code": "",
- "acno":'',
- "actype": "A319,A320,A321",
- "flightNo":"",
- "notView":"",
- "page":"1",
- "onlyAf":"",
- "rows": "500"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01","Cookie": cookie}
- try:
- self.task_LM_FLIGHT_SEARCH_LIST = requests.post(url=self.LM_FLIGHT_SEARCH_LIST, data=data, headers=header).json()
- except Exception:
- self.task_LM_FLIGHT_SEARCH_LIST = None
- #with open('./temp/task_LM_FLIGHT_SEARCH_LIST/fl{}.json'.format(int(time.time())), 'w') as f:
- # json.dump(self.task_LM_FLIGHT_SEARCH_LIST, f, indent=4)
- return self.task_LM_FLIGHT_SEARCH_LIST
- def request_filght_list(self, flightDate:str, cookie):
- data = {"base4code": "ZUTF",
- "flightDate": flightDate,#2023-06-02
- "tasktype": "",
- "notView": "",
- "jcType": "",
- "actype1": "(A319|A320|A321)",
- "actype": "A319,A320,A321",
- "acno": "",
- "dep_4code": "",
- "arr_4code": "",
- "flightNo": "",
- "repush": ""
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.flight_list_json = requests.post(url=self.flight_list_url, data=data, headers=header).json()
- except Exception:
- self.flight_list_json = None
- #with open('./temp/filght_list/fl{}.json'.format(int(time.time())), 'w') as f:
- # json.dump(self.flight_list_json, f, indent=4)
- return self.flight_list_json
- def request_LM_TSK_HANDOVER(self,dict:dict,cookie):
- data = {
- "taskids": dict["taskid"],
- "tasksts": dict["sts"], # 必须,可能需要修改,不知道影响不
- "FunctionCode": "LM_TSK_HANDOVER"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.LM_TSK_HANDOVER = requests.post(url=self.LM_TSK_HANDOVER_url, data=data, headers=header).json()
- #print(self.LM_TSK_HANDOVER)
- except Exception:
- self.LM_TSK_HANDOVER = None
- return self.LM_TSK_HANDOVER
- def request_LM_TSK_DINGDONG(self,dict:dict,cookie):
- data = {"tasksts": "1", # 必须
- "taskid": "{}".format(dict["taskid"]),
- "acno": dict["acno"],
- "actype": dict["actype"],
- "tasktype": dict["tasktype"],
- "tatd": dict["tatd"],
- "msgInfo": dict["msgInfo"],
- "bay": dict["bay"],
- "wxemp": dict["wxemp"], # 必须dict["wxemp"]
- "fxemp": dict["fxemp"],
- "wx": dict["wx"],
- "fx": dict["fx"],
- "ecsj": dict["ecsj"],
- "ecsjEmp": "",
- "FunctionCode": "LM_TSK_DINGDONG"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.LM_TSK_DINGDONG = requests.post(url=self.LM_TSK_DINGDONG_url, data=data, headers=header).json()
- except Exception:
- self.LM_TSK_DINGDONG = None
- return self.LM_TSK_DINGDONG
- def request_flight_third_list(self, flightDate: str, cookie):
- data = {"base4code": "ZUTF",
- "flightDate": flightDate,
- "flightDateStart": flightDate,
- "tasktype": "",
- "notView": "",
- "isThird":"Y",
- "actype1": "(21N|319|320|321|32N|332|738|73G|A21N|A319|A319|A320|A321|A32N|A738|A73G|AZZZ|B737|C919)",
- "actype": "B737NG,B737MAX,A319,A320,A321,C919"
- #"actype": "A319%2CA320%2CA321%2CB737MAX%2CB737NG"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.flight_third_list = requests.post(url=self.flight_list_third_url, data=data, headers=header).json()
- except Exception:
- self.flight_third_list = None
- #with open('./temp/filght_list/fl{}.json'.format(int(time.time())), 'w') as f:
- # json.dump(self.flight_third_list, f, indent=4)
- return self.flight_third_list
- def request_LM_TSK_SURE_PG(self,dict:dict,cookie):
- empNos = dict["empNos"]
- taskid = dict["taskid"]
- type = dict["type"]
- shiftDate = dict["shiftDate"]
- shift = dict["shift"]
- data = {"empNos": empNos,
- "taskid": taskid,
- "type": type, #ECSJ 二次送机 WX FX
- "shiftDate": shiftDate, #夜班航前需要前一天
- "shift": shift,
- "FunctionCode": "LM_TSK_SURE_PG"
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- #print(data)
- self.LM_TSK_SURE_PG = requests.post(url=self.LM_TSK_SURE_PG_url, data=data, headers=header).json()
- #print(self.LM_TSK_SURE_PG )
- except Exception:
- self.LM_TSK_SURE_PG = None
- return self.LM_TSK_SURE_PG
- def request_LM_TSK_EMP_PGLIST(self,taskid:str,shiftDate:str,shift:str,cookie):
- data = {"flightDate": shiftDate,
- "shift": shift,
- "type": "WX",
- "taskid":taskid,
- "wx": "wx",
- "baseCode": "TF01", #EMP_NO,NAME,MAINLY_ROLE
- "deptNo": "",
- "fx": "fx",
- }
- header = {"Accept": "application/json, text/javascript, */*; q=0.01",
- "Cookie": cookie}
- try:
- self.LM_TSK_EMP_PGLIST = requests.post(url=self.LM_TSK_EMP_PGLIST_url, data=data, headers=header).json()
- except Exception:
- self.LM_TSK_EMP_PGLIST = None
- #with open('./temp/LM_TSK_EMP_PGLIST/fl{}.json'.format(int(time.time())), 'w') as f:
- # json.dump(self.LM_TSK_EMP_PGLIST, f, indent=4)
- return self.LM_TSK_EMP_PGLIST
- if __name__ == '__main__':
- fl = flight_list()
- #fl.start()
- nowDay = datetime.datetime.now().replace(microsecond=0)
- nowDayStr = nowDay.strftime("%Y-%m-%d")
|