Coverage for rfpy/vendor/api/issue.py: 98%

58 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from typing import List, TYPE_CHECKING 

2 

3from sqlalchemy import not_, and_ 

4 

5from rfpy.suxint import http 

6from rfpy.model import ( 

7 Issue, 

8 Project, 

9 IssueWatchList, 

10 Organisation, 

11 ProjectNote, 

12 AuditEvent, 

13) 

14from rfpy.vendor.validation import validate 

15from rfpy.web import serial 

16from rfpy.api import fetch 

17from rfpy.auth import perms 

18 

19if TYPE_CHECKING: 

20 from rfpy.model import User 

21 from sqlalchemy.orm import Session 

22 

23# columns exposed to respondent users 

24respondent_columns = ( 

25 Issue.id, 

26 Issue.status, 

27 Issue.issue_date, 

28 Issue.accepted_date, 

29 Issue.deadline, 

30 Issue.label, 

31 Issue.winloss_exposed, 

32 Issue.winloss_expiry, 

33 Issue.submitted_date, 

34 Issue.respondent_id, 

35 Issue.use_workflow, 

36 Project.title, 

37 Project.org_id, 

38 IssueWatchList.date_created.isnot(None).label("is_watched"), 

39) 

40 

41 

42def _issue_q(session, user): 

43 return ( 

44 session.query(Issue) 

45 .join(Project) 

46 .join(Organisation) 

47 .outerjoin( 

48 IssueWatchList, 

49 and_( 

50 IssueWatchList.user_id == user.id, IssueWatchList.issue_id == Issue.id 

51 ), 

52 ) 

53 .with_entities(*respondent_columns) 

54 ) 

55 

56 

57@http 

58def get_issue(session, effective_user, issue_id) -> serial.VendorIssue: 

59 # Using subscript notation here because we want to 

60 # reuse the list of columns but this won't work for returning 

61 # a single value 

62 issue = _issue_q(session, effective_user).filter(Issue.id == issue_id)[0] 

63 

64 validate(effective_user, issue) 

65 return issue._asdict() 

66 

67 

68@http 

69def get_issues(session, effective_user) -> List[serial.VendorIssue]: 

70 issues = ( 

71 _issue_q(session, effective_user) 

72 .filter( 

73 Issue.respondent_id == effective_user.organisation.id, 

74 not_(Issue.status.in_(("Not Sent", "Retracted"))), 

75 ) 

76 .order_by(Issue.issue_date.desc()) 

77 ) 

78 return [i._asdict() for i in issues] 

79 

80 

81""" Status Changes """ 

82 

83 

84@http 

85def post_issue_status( 

86 session: "Session", 

87 effective_user: "User", 

88 issue_id: int, 

89 issue_status_doc: serial.IssueStatus, 

90) -> None: 

91 issue = fetch.issue(session, issue_id) 

92 status = issue_status_doc.new_status 

93 

94 # This check is already effectively performed in Issue.changed_status 

95 # but double checking here 

96 permitted_statuses = ("Accepted", "Submitted", "Declined") 

97 if status not in permitted_statuses: 

98 raise ValueError("new status must be one of %s" % str(permitted_statuses)) 

99 status_perms = { 

100 "Accepted": perms.ISSUE_ACCEPT, 

101 "Submitted": perms.ISSUE_SUBMIT, 

102 "Declined": perms.ISSUE_DECLINE, 

103 } 

104 action = status_perms[status] 

105 validate(effective_user, issue=issue, action=action) 

106 

107 issue.change_status(effective_user, status) 

108 

109 

110@http 

111def post_issue_workflow(session, effective_user, issue_id, issue_workflow_doc): 

112 issue = fetch.issue(session, issue_id) 

113 validate(effective_user, issue=issue, action=perms.ISSUE_UPDATE_WORKFLOW) 

114 issue.use_workflow = issue_workflow_doc.use_workflow 

115 

116 

117""" NOTES """ 

118 

119 

120@http 

121def post_issue_note( 

122 session, effective_user, issue_id, respondent_note_doc: serial.RespondentNote 

123) -> serial.Id: 

124 issue = fetch.issue(session, issue_id) 

125 validate(effective_user, issue, action=perms.PROJECT_ADD_NOTE) 

126 project = issue.project 

127 

128 note = ProjectNote( 

129 kind="RespondentNote", 

130 project=project, 

131 note_text=respondent_note_doc.note_text, 

132 private=respondent_note_doc.private, 

133 user_id=effective_user.id, 

134 org_id=effective_user.org_id, 

135 target_org_id=None, # RespondentNotes don't set target_org_id 

136 ) 

137 

138 session.add(note) 

139 session.flush() 

140 

141 evt = AuditEvent.create( 

142 session, 

143 "PROJECT_NOTE_ADDED", 

144 project=project, 

145 object_id=note.id, 

146 user_id=note.user_id, 

147 org_id=note.org_id, 

148 ) 

149 evt.add_change("note_text", "", note.note_text) 

150 evt.add_change("private", "", note.private) 

151 session.add(evt) 

152 return serial.Id(id=note.id) 

153 

154 

155@http 

156def get_issue_notes(session, effective_user, issue_id) -> List[serial.ReadNote]: 

157 issue = fetch.issue(session, issue_id) 

158 validate(effective_user, issue, action=perms.ISSUE_VIEW_ANSWERS) 

159 nq = fetch.vendor_notes(issue, effective_user) 

160 rn = serial.ReadNote 

161 return [rn.model_validate(note) for note in nq]