Coverage for rfpy/api/endpoints/tags.py: 96%
85 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1"""
2Manage tags - keywords that can be used to categorize questions
3"""
5from rfpy.api import fetch
6from typing import Callable, List
8from sqlalchemy import func, null
9from sqlalchemy.orm import Session
10from sqlalchemy.orm.exc import NoResultFound
12from rfpy.api import validate
13from rfpy.auth import perms
14from rfpy.suxint import http
15from rfpy.model import (
16 User,
17 Organisation,
18 Project,
19 Participant,
20 Section,
21 QuestionInstance,
22)
23from rfpy.model.tags import tags_qinstances_table, Tag
24from rfpy.model.exc import DuplicateDataProvided
25from rfpy.web import serial
28@http
29def get_tags(session: Session, user: User) -> List[serial.Tag]:
30 """
31 Get an array of all the Tags defined by your organization
32 """
33 q = session.query(Tag).join(Organisation).filter(Organisation.id == user.org_id)
35 return [serial.Tag.model_validate(e) for e in q]
38@http
39def get_tags_section(
40 session: Session, user: User, section_id: int
41) -> List[serial.TagGroup]:
42 """
43 Get an array of objects, one for each tag, listing the question ids assigned to each
44 tag for the given section_id.
46 N.B. - this operation is not recursive only questions that are direct children of the given
47 section ID are considered
48 """
49 project_id = (
50 session.query(Section.project_id).filter(Section.id == section_id).scalar()
51 )
52 if project_id is None:
53 raise NoResultFound(f"Section ID {section_id} not found")
54 project = fetch.project(session, project_id)
56 sec = session.get(Section, section_id)
57 if sec is None:
58 raise NoResultFound(f"Section with ID {section_id} not found")
60 validate.check(
61 user, perms.PROJECT_VIEW_QUESTIONNAIRE, project=project, section_id=section_id
62 )
64 subsections = fetch.get_subsections_recursive(session, section_id).subquery()
65 q = (
66 session.query(
67 Tag.id.label("tag_id"),
68 func.GROUP_CONCAT(
69 func.IF(subsections.c.parent_lvl_1 == None, QuestionInstance.id, null()) # noqa: E711
70 ).label("qids"), # noqa: E711
71 func.GROUP_CONCAT(subsections.c.parent_lvl_1).label("section_ids"),
72 )
73 .select_from(Tag)
74 .join(tags_qinstances_table)
75 .join(QuestionInstance)
76 .join(subsections)
77 .group_by(Tag.id)
78 )
79 return [
80 serial.TagGroup(
81 tag_id=r.tag_id,
82 question_instance_ids=[int(qid) for qid in r.qids.split(",")]
83 if r.qids
84 else [],
85 section_ids=[int(sid) for sid in r.section_ids.split(",")]
86 if r.section_ids
87 else [],
88 )
89 for r in q
90 ]
93@http
94def post_tag(session: Session, user: User, tag_doc: serial.NewTag) -> serial.Tag:
95 """
96 Create a new Tag for your organization
98 @permission MANAGE_ORGANISATION
99 """
100 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
101 tag = Tag(name=tag_doc.name, description=tag_doc.description)
102 if (
103 session.query(Tag)
104 .filter(Tag.name == tag.name)
105 .filter(Tag.organisation == user.organisation)
106 .all()
107 ):
108 raise DuplicateDataProvided(f"tag '{tag.name}' already exists")
110 user.organisation.tags.append(tag)
111 session.flush()
112 return serial.Tag.model_validate(tag)
115@http
116def put_tag(
117 session: Session, user: User, tag_id: int, tag_doc: serial.NewTag
118) -> serial.Tag:
119 """
120 Update the name and description of an existing tag
122 @permission MANAGE_ORGANISATION
123 """
124 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
125 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one()
126 tag.name = tag_doc.name
127 tag.description = tag_doc.description
128 return serial.Tag.model_validate(tag)
131@http
132def delete_tag(session: Session, user: User, tag_id: int):
133 """
134 Delete the tag with the given ID and remove all references to that tag
135 from related items.
137 @permission MANAGE_ORGANISATION
138 """
139 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
140 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one()
141 session.delete(tag)
144def _tag_assign(
145 session: Session,
146 user: User,
147 tag_id: int,
148 tag_assigns_doc: serial.TagAssigns,
149 action_func: Callable,
150):
151 """
152 Processes a TagAssigns document listing question ids to be assigned/unassigned (linked/unlinked)
153 to the tag with the given ID. Takes care of checking that that question IDs belong to projects
154 visible to the current user.
156 Action to assign or unassign the tag is delegated to the 'action_func' callable.
157 """
159 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
161 # Check tag exists for org
162 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one()
164 _add_questions_from_sections(session, user, tag_assigns_doc)
166 instance_ids = set(tag_assigns_doc.question_instance_ids)
167 provided_ids = [i for i in instance_ids]
169 qi_q = (
170 session.query(QuestionInstance)
171 .select_from(QuestionInstance)
172 .join(Participant, Participant.org_id == user.org_id)
173 .join(Project, Project.id == QuestionInstance.project_id)
174 .filter(QuestionInstance.id.in_(instance_ids))
175 )
177 action_func(tag, qi_q, instance_ids)
179 if instance_ids:
180 unmatched = ", ".join(str(i) for i in instance_ids)
181 raise ValueError(
182 f"The following Question Instance ids were not found: {unmatched}"
183 )
185 return provided_ids
188def _add_questions_from_sections(
189 session: Session, user: User, tag_assigns_doc: serial.TagAssigns
190):
191 """
192 Look up question instance ids for the sections IDs provided. Extend the list of provided
193 question_instance_id values in tag_assigns_docs.
194 """
195 qids = tag_assigns_doc.question_instance_ids
196 for sec in (
197 session.query(Section)
198 .join(Project, Project.id == Section.project_id)
199 .join(Participant)
200 .filter(Participant.org_id == user.org_id)
201 .filter(Section.id.in_(tag_assigns_doc.section_ids))
202 ):
203 if tag_assigns_doc.recursive:
204 q = (
205 session.query(QuestionInstance.id)
206 .filter(QuestionInstance.project_id == sec.project_id)
207 .filter(QuestionInstance.number.startswith(sec.number))
208 )
209 else:
210 q = session.query(QuestionInstance.id).filter(
211 QuestionInstance.section == sec
212 )
213 qids.extend(r.id for r in q)
216@http
217def post_tag_assign(
218 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns
219) -> List[int]:
220 """
221 Assign the Tag with the given ID to the question instances with ID values
222 provided in the TagAssigns document.
224 Duplicate assignments are silently ignored.
226 An array of assigned question IDs is returned.
228 @permission MANAGE_ORGANISATION
229 """
231 def assign(tag, question_instances, qids_to_assign):
232 for qi in question_instances:
233 qids_to_assign.remove(qi.id)
234 if tag not in qi.tags:
235 qi.tags.append(tag)
237 return _tag_assign(session, user, tag_id, tag_assigns_doc, assign)
240@http
241def delete_tag_assign(
242 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns
243) -> List[int]:
244 """
245 Un-assign the Tag with the given ID from the question instances with ID values
246 provided in the TagAssigns document.
248 An array of un-assigned question IDs is returned.
250 @permission MANAGE_ORGANISATION
251 """
253 def un_assign(tag, question_instances, qids_to_assign):
254 for qi in question_instances:
255 qids_to_assign.remove(qi.id)
256 if tag in qi.tags:
257 qi.tags.remove(tag)
259 return _tag_assign(session, user, tag_id, tag_assigns_doc, un_assign)