Coverage for rfpy/api/endpoints/issues.py: 98%
92 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1"""
2Operations for managing issues - invitations to and responses from vendors
3"""
5from datetime import datetime
6from typing import Optional
8from sqlalchemy.orm.session import Session
10from rfpy.suxint import http
11from rfpy.auth import perms, AuthorizationFailure
12from rfpy.api import fetch, validate
13from rfpy.model import (
14 Organisation,
15 Issue,
16 NotSentIssue,
17 AuditEvent,
18 Project,
19 Participant,
20)
21from rfpy.model.humans import User
22from rfpy.web import serial, Pager
25@http
26def get_vendor(session: Session, user: User, q_org_id: str) -> serial.Supplier:
27 """
28 Information about a vendor organisation:
29 - Name & ID
30 - Issues from this buyer/participant
31 - Users (only exposed to Consultant organisations)
32 """
34 if not user.organisation.is_consultant:
35 raise AuthorizationFailure("Only Consultant users can view Vendor details")
37 # Will raise No Result Found if not in Suppliers list
38 vendor = user.organisation.suppliers.filter(Organisation.id == q_org_id).one()
40 issues = fetch.issues_for_respondent(session, user, q_org_id)
42 return serial.Supplier(
43 issues=[serial.Issue.model_validate(i) for i in issues],
44 organisation=serial.Organisation.model_validate(vendor),
45 users=[serial.User.model_validate(u) for u in vendor.users],
46 )
49@http
50def get_project_issue(
51 session: Session, user: User, project_id: int, issue_id: int
52) -> serial.Issue:
53 project = fetch.project(session, project_id)
54 validate.check(user, perms.PROJECT_ACCESS, project=project)
55 issue = project.get_issue(issue_id)
56 return serial.Issue.model_validate(issue)
59@http
60def get_project_issues(session: Session, user: User, project_id: int) -> serial.Issues:
61 """Fetch a list of Issue objects for the given project"""
62 project = fetch.project(session, project_id)
63 validate.check(user, perms.PROJECT_ACCESS, project=project)
64 fo = serial.Issue.model_validate
65 return serial.Issues([fo(i) for i in project.issues])
68@http
69def put_project_issue(
70 session: Session, user: User, project_id: int, issue_id: int, issue_doc: dict
71):
72 """
73 Update the properties of an existing Issue
75 Status cannot be updated this way - use post_issue_status instead
76 """
77 project = fetch.project(session, project_id)
78 validate.check(user, perms.PROJECT_ACCESS, project=project)
79 issue = project.get_issue(issue_id)
80 validate.check(user, perms.ISSUE_UPDATE, issue=issue)
82 change_list = []
84 for field, value in issue_doc.items():
85 change_list.append((field, getattr(issue, field), value))
86 setattr(issue, field, value)
88 AuditEvent.create(
89 session,
90 "ISSUE_UPDATED",
91 project=issue.project,
92 issue_id=issue.id,
93 user_id=user.id,
94 org_id=user.organisation.id,
95 object_id=issue.id,
96 timestamp=datetime.now(),
97 private=True,
98 change_list=change_list,
99 )
102@http
103def get_issue(session: Session, user: User, issue_id: int) -> serial.Issue:
104 issue = fetch.issue(session, issue_id)
105 validate.check(user, perms.PROJECT_ACCESS, project=issue.project)
106 return serial.Issue.model_validate(issue)
109@http
110def post_project_issue(
111 session: Session, user: User, project_id: int, new_issue_doc: serial.NewIssue
112) -> serial.Id:
113 """
114 Create a new Issue
116 Either respondent_id or respondent_email should be set, but not both
117 """
118 project = fetch.project(session, project_id)
119 validate.check(user, perms.ISSUE_CREATE, project=project)
120 label = new_issue_doc.label
121 respondent_id = new_issue_doc.respondent_id
122 if respondent_id is None:
123 issue = NotSentIssue(
124 respondent_email=new_issue_doc.respondent_email, label=label
125 )
126 else:
127 org: Organisation = (
128 session.query(Organisation).filter_by(id=respondent_id).one()
129 )
130 issue = NotSentIssue(respondent=org, label=label)
132 project.add_issue(issue)
133 session.flush()
134 evt = AuditEvent.create(
135 session, "ISSUE_CREATED", issue_id=issue.id, project=project, user=user
136 )
137 for k in serial.NewIssue.model_fields.keys():
138 if k not in ("respondent_id", "respondent_email"):
139 evt.add_change(k, "", getattr(issue, k))
140 session.add(evt)
142 return serial.Id(id=issue.id)
145@http
146def delete_project_issue(
147 session: Session, user: User, project_id: int, issue_id: int
148) -> None:
149 """
150 Delete the Issue with the given id
152 Only permitted at status:
153 - Not Sent
154 - Declined
155 - Retracted
157 At other statuses the Issue belongs to the Respondent organisation
158 therefore cannot be deleted by the Buyer
159 """
160 project = fetch.project(session, project_id)
161 validate.check(user, perms.PROJECT_ACCESS, project=project)
162 issue = project.get_issue(issue_id)
163 validate.check(user, perms.ISSUE_DELETE, issue=issue)
164 session.delete(issue)
165 evt = AuditEvent.create(
166 session, "ISSUE_DELETED", issue_id=issue_id, project=project, user=user
167 )
168 session.add(evt)
171@http
172def post_issue_status(
173 session: Session, user: User, issue_id: int, issue_status_doc: serial.IssueStatus
174) -> None:
175 """Change the status of the given issue"""
176 issue = fetch.issue(session, issue_id)
177 validate.check(user, perms.ISSUE_SUBMIT, project=issue.project)
178 issue.change_status(user, issue_status_doc.new_status)
181@http
182def get_issues(
183 session: Session,
184 user: User,
185 issue_sort: str,
186 sort_order: str,
187 q_org_id: Optional[str] = None,
188 issue_status: Optional[str] = None,
189 pager: Optional[Pager] = None,
190) -> serial.IssuesList:
191 """
192 Get an array of Issues from all projects visible to the current user.
194 """
195 if pager is None:
196 pager = Pager(page=1, page_size=100)
198 cols = (
199 Issue.id.label("issue_id"),
200 Issue.respondent_id,
201 Issue.respondent_email,
202 Project.id.label("project_id"),
203 Issue.status,
204 Issue.deadline,
205 Issue.submitted_date,
206 Issue.issue_date,
207 Project.title.label("project_title"),
208 Issue.winloss_exposed,
209 Issue.winloss_expiry,
210 Issue.label,
211 )
213 sort_cols = {
214 "deadline": Issue.deadline,
215 "submitted_date": Issue.submitted_date,
216 "issue_date": Issue.issue_date,
217 }
219 order_column = sort_cols[issue_sort]
220 ordering = order_column.desc() if sort_order == "desc" else order_column.asc()
222 iq = (
223 session.query(*cols)
224 .join(Project)
225 .join(Participant)
226 .filter(Participant.org_id == user.org_id)
227 )
229 if q_org_id:
230 iq = iq.filter(Issue.respondent_id == q_org_id)
232 if issue_status:
233 iq = iq.filter(Issue.status == issue_status)
235 total_records = iq.count()
237 records = iq.order_by(ordering).slice(pager.startfrom, pager.goto).all()
238 return serial.IssuesList(
239 data=[serial.ListIssue.model_validate(i) for i in records],
240 pagination=pager.as_pagination(total_records, len(records)),
241 )