import psycopg2 import traceback from unitls.LogerinTxt import app_logger class flightDB(): def __init__(self, host, port, user, password, database): self.conn = psycopg2.connect( host=host, port=port, user=user, password=password, database=database ) self.c = self.conn.cursor() def initTable(self, tableName:str,primarykey:str,primarykeyStr:str,keyDict:dict): try: keyStr = '' num = 1 for key in keyDict: if num !=len(keyDict): keyStr += '{} {},\n'.format(key,keyDict[key]) else: keyStr += '{} {}'.format(key, keyDict[key]) num +=1 curStr =""" create table if not exists {} ( {} {} , {} ) """.format(tableName, primarykey, primarykeyStr,keyStr) #print(curStr) self.c.execute(curStr) self.conn.commit() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def insertData(self, tableName:str, data:dict, *args): try: #curStr1 = 'insert into {} '.format(tableName) num = 1 curStr2 = '' for key in data: if num != len(data): curStr2+='{},'.format(key) else: curStr2+='{}'.format(key) num+=1 curStr3 = '' num = 1 for key in data: if num != len(data): curStr3+="'{}',".format(data[key]) else: curStr3+="'{}'".format(data[key]) num+=1 curStr = """ insert into {} ({}) values ({}) """.format(tableName, curStr2, curStr3) #print(curStr) self.c.execute(curStr) if args==(): self.conn.commit() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def lazyInsertData(self,tableName:str, data:dict): self.insertData(tableName, data, 'lazy') def lazydeleteTable(self,tablename:str): try: curStr = """delete from {}""".format(tablename) self.c.execute(curStr) except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) def lazyInsertData2(self,tableName:str, curStr2, curStr3): try: curStr = """insert into {} ({})values {}""".format(tableName, curStr2, curStr3) self.c.execute(curStr) except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) def lazyInsertData3(self,curStr1, curStr2, curStr3): try: curStr = """insert into display (ID, A, B)values (%s,'%s','%s')"""%(curStr1, curStr2, curStr3) self.c.execute(curStr) except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) def FunctionCommit(self): self.conn.commit() def FunctionRollback(self): self.conn.rollback() def deleteTable(self, tablename:str, *condition:str): try: if condition !=(): curStr = """ delete from {} where {} """.format(tablename, condition[0]) else: curStr = """ delete from {} """.format(tablename) self.c.execute(curStr) self.conn.commit() return "ok" except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) return "fail" def copyTable(self, oldtablename:str, newTablename:str): try: curStr = """ drop table {} """.format(newTablename) self.c.execute(curStr) self.conn.commit() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def sortTable(self, tablename:str, tableKey:str, fn): try: curStr = """ select * from {} order by {} {} """.format(tablename, tableKey, fn) #print(curStr) self.c.execute(curStr) return self.c.fetchall() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def sort_queryTable(self, findkey:str,tablename:str, condition:str,tableKey:str, fn): try: curStr = """ select {} from {} where {} order by {} {} """.format(findkey,tablename, condition,tableKey, fn) #print(curStr) self.c.execute(curStr) return self.c.fetchall() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def sort_queryTable2(self, findkey:str,tablename:str, condition:str,tableKey:str, fn,tableKey1:str, fn1): try: curStr = """ select {} from {} where {} order by {} {},{} {} """.format(findkey,tablename, condition,tableKey, fn,tableKey1, fn1) #print(curStr) self.c.execute(curStr) return self.c.fetchall() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def queryTabel(self, tablename: str, key: str, condition: str): try: # 检查表是否存在 curStr1 = """ SELECT EXISTS ( SELECT * FROM pg_catalog.pg_tables WHERE tablename = %s AND schemaname = 'public' ); """ self.c.execute(curStr1, (tablename.lower(),)) table_exists = self.c.fetchone()[0] if table_exists: # 防止SQL注入,使用参数化查询 curStr = f"SELECT {key} FROM {tablename} WHERE {condition}" self.c.execute(curStr) return self.c.fetchall() else: print(f"表 '{tablename}' 不存在") return [] # 返回空列表而不是None,便于统一处理 except Exception as e: app_logger.log_error(e) print(traceback.format_exc()) self.conn.rollback() return [] def getAlldata(self, tablename:str): try: curStr = """select * from {}""".format(tablename) self.c.execute(curStr) return self.c.fetchall() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def upDateItem(self, tablename:str, dateDic:dict, condition:str, *args): try: setStr = '' for key in dateDic: if tablename == 'display': setStr += '{}={},'.format(key, str(dateDic[key]).replace("'","''")) else: setStr += '{}={},'.format(key, dateDic[key]) if tablename == 'display': setStr = setStr[:-1].replace('"',"'") else: setStr = setStr[:-1] curStr = """ update {} set {} where {} """.format(tablename, setStr, condition) #print(curStr) if setStr !="": self.c.execute(curStr) if args == (): self.conn.commit() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def lazyUpdateItem(self,tablename:str, dateDic:dict, condition:str): self.upDateItem(tablename,dateDic,condition,'lazy') def getSingledata(self,findkey:str,tablename:str): try: curStr = """ select {} from {} """.format(findkey,tablename) self.c.execute(curStr) return self.c.fetchall() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def deleteSingledata(self,tablename:str,findkey:str): try: curStr = """ delete from {} where {} """.format(tablename,findkey) self.c.execute(curStr) #print(curStr) self.conn.commit() except Exception as e: app_logger.log_error(curStr) app_logger.log_error(e) print(traceback.format_exc()) def close(self): try: self.conn.close() except Exception as e: app_logger.log_error(e) print(traceback.format_exc())