import dmPython import json from Crypto.Cipher import AES from Crypto.Random import get_random_bytes import base64 import datetime import k8s_func db = dmPython.connect(user='SYSDBA', password='dameng!!', host="36.138.114.105", port="32522") def convert_iso_to_database_format(iso_datetime_str): # 解析 ISO 8601 格式的日期时间字符串为 datetime 对象 iso_datetime = datetime.datetime.fromisoformat(iso_datetime_str.replace('Z', '+00:00')) # 格式化为数据库需要的日期时间格式 formatted_datetime_str = iso_datetime.strftime('%Y-%m-%d %H:%M:%S') return formatted_datetime_str #获取题目 def choice_question_func(ID): question_cursor = db.cursor() #查询表中字段,与所选两张表中字段相同的信息 question_cursor.execute(f'SELECT CHOICE_QUESTION_BANK.* FROM TEACHER_MIDDLE_SUBJECT INNER JOIN ' f'CHOICE_QUESTION_BANK ON TEACHER_MIDDLE_SUBJECT.SUBJECT = CHOICE_QUESTION_BANK.SUBJECT ' f'WHERE TEACHER_MIDDLE_SUBJECT.TEACHER_ID = (?)',(ID,)) choice_question_result = question_cursor.fetchall() choice_list=[] for i in choice_question_result: choice_list.append(list(i)) question_cursor.close() return choice_list def completion_question_func(ID): question_cursor = db.cursor() question_cursor.execute(f'SELECT COMPLETION_QUESTION_BANK.* FROM TEACHER_MIDDLE_SUBJECT INNER JOIN ' f'COMPLETION_QUESTION_BANK ON TEACHER_MIDDLE_SUBJECT.SUBJECT = COMPLETION_QUESTION_BANK.SUBJECT ' f'WHERE TEACHER_MIDDLE_SUBJECT.TEACHER_ID = (?)',(ID,)) completion_question_result = question_cursor.fetchall() completion_list=[] for i in completion_question_result: completion_list.append(list(i)) question_cursor.close() return completion_list def t_or_f_question_func(ID): question_cursor = db.cursor() question_cursor.execute(f'SELECT T_OR_F_QUESTION_BANK.* FROM TEACHER_MIDDLE_SUBJECT INNER JOIN ' f'T_OR_F_QUESTION_BANK ON TEACHER_MIDDLE_SUBJECT.SUBJECT = T_OR_F_QUESTION_BANK.SUBJECT ' f'WHERE TEACHER_MIDDLE_SUBJECT.TEACHER_ID = (?)',(ID,)) t_or_f_question_result = question_cursor.fetchall() t_or_f_list=[] for i in t_or_f_question_result: t_or_f_list.append(list(i)) question_cursor.close() return t_or_f_list def find_classboss_succeed_func(teacher_id): cursor = db.cursor() cursor.execute(f'SELECT CLASS FROM TEACHER_MIDDLE_CLASS WHERE TEACHER_ID={teacher_id}') class_list=cursor.fetchall()[0][0] class_list=eval(class_list) class_succeed_dic={} for i in class_list: cursor.execute(f"SELECT CLASS_MIDDLE_HEADTEACHER.HEADTEACHER FROM STUDENT_MIDDLE_CLASS INNER JOIN " f"CLASS_MIDDLE_HEADTEACHER ON STUDENT_MIDDLE_CLASS.CLASS=CLASS_MIDDLE_HEADTEACHER.CLASS WHERE " f"STUDENT_MIDDLE_CLASS.CLASS='{i}'") list=cursor.fetchall() list = [item[0] for item in list] class_succeed_dic[i]=[list[0],len(list)] cursor.close() return class_succeed_dic # find_classboss_succeed_func('111111') def find_student_succeed_func(ID): cursor=db.cursor() cursor.execute(f'SELECT * FROM STUDENT WHERE ID={ID}') student_succeed=cursor.fetchall()[0] student_succeed_list=list(student_succeed) student_succeed_list.pop(2) if student_succeed_list[5]: pass else: student_succeed_list[5]='未绑定电话号码' cursor.close() return student_succeed_list # find_student_succeed_func('20240101') def find_class_student(Class): students_list=[] cursor = db.cursor() cursor.execute(f'SELECT STUDENT_ID FROM STUDENT_MIDDLE_CLASS WHERE CLASS=(?) ',(Class)) students=cursor.fetchall() for i in students: students_list.append(i[0]) cursor.close() return students_list def fetch_teacher_name(id): cursor = db.cursor() cursor.execute(f'SELECT TEACHER_NAME FROM TEACHER WHERE TEACHER_ID=(?)',(id,)) teacher_name=cursor.fetchall()[0][0] cursor.close() return teacher_name def fetch_subject(id): cursor = db.cursor() cursor.execute(f'SELECT SUBJECT FROM TEACHER_MIDDLE_SUBJECT WHERE TEACHER_ID=(?)',(id,)) subject=cursor.fetchall()[0][0] cursor.close() return subject #存储到TEST_BANK数据表里面 def save_test(choice_list,completion_list,judge_list,hour,min,stop_time,class_list,ID): choice_list=json.dumps(choice_list) completion_list=json.dumps(completion_list) judge_list=json.dumps(judge_list) class_list=json.dumps(class_list) now_time = datetime.datetime.now().replace(microsecond=0) end_time = now_time + datetime.timedelta(days=int(stop_time)) end_time=end_time.replace(microsecond=0) subject=fetch_subject(ID) cursor=db.cursor() cursor.execute(f'INSERT INTO TEST_BANK (CHOICE_ID,COMPLE_ID,T_OR_F_ID,HOUR,MIN,RELEASETIME,STOPTIME,CLASS,SUBJECT,TEACHER_ID) VALUES(?,?,?,?,?,?,?,?,?,?)', (choice_list,completion_list,judge_list,hour,min,now_time,end_time,class_list,subject,ID)) db.commit() # 查找TEST_BANK里面的试卷ID cursor.execute(f'SELECT ID FROM TEST_BANK WHERE RELEASETIME=?',(now_time,)) test_id=cursor.fetchall()[0][0] print(test_id) # 查找学生 student_list=[] for i in eval(class_list): for j in find_class_student(i): student_list.append(j) print(student_list) end='false' # 将试卷分配到每一个学生 for i in student_list: cursor.execute("INSERT INTO STUDENT_TEST (STUDENT_ID,CLASS,SUBJECT,TEST_ID,TF) VALUES (?,?,?,?,?)",(i,i[0:6],subject,test_id,end)) db.commit() cursor.close() return '发布成功' def find_default_class_func(class_ID,teacher_ID): cursor = db.cursor() cursor.execute(f'select * from TEST_BANK WHERE TEACHER_ID=?',(teacher_ID)) class_test_set=cursor.fetchall() class_test_list=[] class_test_dic={} new_dic={} subject=fetch_subject(teacher_ID) for i in class_test_set: class_test_list.append(list(i)) for i in class_test_list: class_test_dic[i[8]]=i[5],i[6] for key in class_test_dic: cursor.execute(f'SELECT * FROM STUDENT_TEST WHERE SUBJECT=? AND CLASS=? AND TEST_ID=?',(subject,class_ID,key)) message_set=cursor.fetchall() sum=0 number=0 for i in message_set: if i[5]!=None: sum+=int(i[5]) number+=1 try: sum=sum/number except ZeroDivisionError: sum=0 new_dic[key]=class_test_dic[key],sum,number return new_dic # find_default_class_func('202401', '111111') def detailed_class(testID, classID): cursor = db.cursor() cursor.execute(f'SELECT * FROM STUDENT_TEST WHERE TEST_ID=? AND CLASS=?',(testID,classID)) data=cursor.fetchall() students=[] StudentName=[] for i in data: students.append([i[0]]) students = [item for sublist in students for item in sublist] for i in students: cursor.execute(f'SELECT NAME FROM STUDENT WHERE ID=?',(i,)) name=cursor.fetchall() StudentName.append(name[0][0]) data=[(*tup,name) for tup,name in zip(data,StudentName)] cursor.close() return data # detailed_class('33','202401') def train_question(): Train={} cursor = db.cursor() cursor.execute(f'SELECT * FROM CHOICE_QUESTION_BANK WHERE SUBJECT=(?)',('Train')) data=cursor.fetchall() Train['choice'] = data cursor.execute(f'SELECT * FROM COMPLETION_QUESTION_BANK WHERE SUBJECT=(?)',('Train')) data=cursor.fetchall() Train['completion'] = data cursor.execute(f'SELECT * FROM T_OR_F_QUESTION_BANK WHERE SUBJECT=(?)',('Train')) data=cursor.fetchall() Train['judge'] = data # print(Train) return Train testID=None TrainName="" studentList=[] # train_question() def SendTrainTestFunc(TrainChoice,TrainCompletion,TrainJudge,Hour,Min,Class,Train,teacher_id,startTime,endTime): TrainChoice = json.dumps(TrainChoice) TrainCompletion = json.dumps(TrainCompletion) TrainJudge = json.dumps(TrainJudge) Class=json.dumps(Class) Train=json.dumps(Train) global TrainName TrainName=Train cursor = db.cursor() cursor.execute(f'INSERT INTO TRAINTEST (CHOICE,COMPLE,JUDGE,HOUR,MIN,RELEASETIME,STOPTIME,CLASS,SUBJECT,TEACHER_ID,Train) VALUES' f'(?,?,?,?,?,?,?,?,?,?,?)',(TrainChoice,TrainCompletion,TrainJudge,Hour,Min,startTime,endTime,Class,'Train',teacher_id,Train)) db.commit() print('添加成功') cursor.execute(f'SELECT ID FROM TRAINTEST WHERE RELEASETIME=?', (startTime,)) test_id = cursor.fetchall()[0][0] # 查找学生 student_list = [] for i in eval(Class): for j in find_class_student(i): student_list.append(j) end = 'false' # 将试卷分配到每一个学生 global studentList studentList=student_list cursor.execute(f"SELECT ID FROM TRAINTEST WHERE RELEASETIME=? ", (startTime)) test_id = cursor.fetchall()[0][0] global testID testID=test_id print('分配成功') db.commit() cursor.close() return '发布成功' num=5 def SendLink(): global TrainName global studentList global testID cursor = db.cursor() print('进入到函数') if TrainName.find(r'["\u524d\u7aef\u4e0e\u6570\u636e\u5e93\u7ed3\u5408"]') != -1: print('s') for i in studentList: print(i) name ='s'+str(i)+'-'+str(testID) k8s_func.create_pod(1, name) item = k8s_func.list_services() for j in item['list']: if j['name'] == name + '-service': Link = 'http://36.138.114.105:' + str(j['ports'][0]['node_port']) Link2 = 'http://36.138.114.105:' + str(j['ports'][1]['node_port']) Link3 = 'http://36.138.114.105:' + str(j['ports'][2]['node_port']) cursor.execute(f"INSERT INTO TRAINSCORE VALUES (?,?,?,?,?,?,?,?,?)",( i,i[0:6],testID,'false','NULL','NULL',Link,Link2,Link3 )) else: pass else: for i in studentList: name = 'n' + str(i) + '-' + str(testID) k8s_func.create_pod(0,name) item = k8s_func.list_services() for j in item['list']: if j['name'] == name + '-service': Link = j['ip'] cursor.execute(f"INSERT INTO TRAINSCORE VALUES (?,?,?,?,?,?,?,?,?)", ( i, i[0:6], testID, 'false', 'NULL', 'NULL', Link, 'NULL', 'NULL' )) print('加入链接成功') db.commit() cursor.close() def TeacherMark(CLASS,ID): global testID global TrainName cursor = db.cursor() strClass=str(CLASS) if TrainName.find(r'["\u524d\u7aef\u4e0e\u6570\u636e\u5e93\u7ed3\u5408"]') != -1: for i in eval(strClass): cursor.execute(f'INSERT INTO TEACHERMARK VALUES(?,?,?,?) ', (ID,testID,'false', i)) else: print(TrainName) db.commit() cursor.close() def Find_details_Func(ID): dic = {} cursor = db.cursor() cursor.execute(f"SELECT * FROM STUDENT_TEST WHERE STUDENT_ID=? AND TF='true' ", (ID)) data = cursor.fetchall() for i in data: dic[i[3]] = i cursor.close() # 调用Find_dic函数并传入dic字典作为参数 return dic # print(Find_details('20240101')) #获取未批改的卷子 def MarkTrainFunc(ClassID,teacher_ID): dic={} cursor = db.cursor() cursor.execute(f"SELECT * FROM TEACHERMARK WHERE CLASS=? AND TEACHERID=? AND TF='false'", (ClassID,teacher_ID)) for i in cursor.fetchall(): cursor.execute(f"SELECT * FROM TRAINTEST WHERE ID=? ", (i[1],)) dic[i[1]]=cursor.fetchall() # print(dic) return dic # MarkTrainFunc('202401','111111') def NotMarkTestFunc(testID): cursor = db.cursor() dic={} cursor.execute(f'SELECT * FROM TRAINSCORE WHERE TEST_ID=?',(testID)) for i in cursor.fetchall(): cursor.execute(f'SELECT NAME FROM STUDENT WHERE ID=?',(i[0])) dic[cursor.fetchall()[0][0]]=i return dic # NotMarkTest('9') def submitScoreFunc(score,testID,suggestion): cursor = db.cursor() for i in score: cursor.execute(f'UPDATE TRAINSCORE SET TRAINRESULT=? ,CRECOMMENDATION=?WHERE TEST_ID=? AND STUDENT_ID=?', (score[i],suggestion[i], testID,i)) cursor.execute(f'UPDATE TEACHERMARK SET TF=? WHERE TRAINID=?',('true',testID)) db.commit() return 'SELECT' # score= {'20240101':'123','20240102':'132','20240103':'321'} # submitScoreFunc(score,'9') def getTrainFunc(teacherID): cursor = db.cursor() cursor.execute(f'SELECT * FROM TRAINTEST WHERE TEACHER_ID=?',(teacherID)) data = cursor.fetchall() return data