Coverage for rfpy/api/endpoints/tags.py: 82%
85 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1'''
2Manage tags - keywords that can be used to categorize questions
3'''
4from rfpy.api import fetch
5from typing import Callable, List
7from sqlalchemy import func, null
8from sqlalchemy.orm import Session
9from sqlalchemy.orm.exc import NoResultFound
11from rfpy.api import validate
12from rfpy.auth import perms
13from rfpy.suxint import http
14from rfpy.model import User, Organisation, Project, Participant, Section, QuestionInstance
15from rfpy.model.tags import tags_qinstances_table, Tag
16from rfpy.model.exc import DuplicateDataProvided
17from rfpy.web import serial
20@http
21def get_tags(session: Session, user: User) -> List[serial.Tag]:
22 '''
23 Get an array of all the Tags defined by your organization
24 '''
25 q = session.query(Tag)\
26 .join(Organisation)\
27 .filter(Organisation.id == user.org_id)
29 return [serial.Tag.from_orm(e) for e in q]
32@http
33def get_tags_section(session: Session, user: User, section_id: int) -> List[serial.TagGroup]:
34 '''
35 Get an array of objects, one for each tag, listing the question ids assigned to each
36 tag for the given section_id.
38 N.B. - this operation is not recursive only questions that are direct children of the given
39 section ID are considered
40 '''
41 project_id = session.query(Section.project_id).filter(
42 Section.id == section_id).scalar()
43 if project_id is None:
44 raise NoResultFound(f'Section ID {section_id} not found')
45 project = fetch.project(session, project_id)
47 sec = session.query(Section).get(section_id)
48 if sec is None:
49 raise NoResultFound(f"Section with ID {section_id} not found")
51 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE,
52 project=project, section_id=section_id)
54 subsections = fetch.get_subsections_recursive(
55 session, section_id).subquery()
56 q = (
57 session.query(
58 Tag.id.label('tag_id'),
59 func.GROUP_CONCAT(func.IF(subsections.c.parent_lvl_1 ==
60 None, QuestionInstance.id, null())).label('qids'),
61 func.GROUP_CONCAT(subsections.c.parent_lvl_1).label('section_ids'),
62 )
63 .select_from(Tag)
64 .join(tags_qinstances_table)
65 .join(QuestionInstance)
66 .join(subsections)
67 .group_by(Tag.id)
68 )
69 return [
70 {
71 'tag_id': r.tag_id,
72 'question_instance_ids': [int(qid) for qid in r.qids.split(',')] if r.qids else [],
73 'section_ids': [int(sid) for sid in r.section_ids.split(',')] if r.section_ids else []
74 }
75 for r in q
76 ]
79@http
80def post_tag(session: Session, user: User, tag_doc: serial.NewTag) -> serial.Tag:
81 '''
82 Create a new Tag for your organization
84 @permission MANAGE_ORGANISATION
85 '''
86 validate.check(user, perms.MANAGE_ORGANISATION,
87 target_org=user.organisation)
88 tag = Tag(**tag_doc)
89 if session.query(Tag)\
90 .filter(Tag.name == tag.name)\
91 .filter(Tag.organisation == user.organisation)\
92 .all():
93 raise DuplicateDataProvided(f"tag '{tag.name}' already exists")
95 user.organisation.tags.append(tag)
96 session.flush()
97 return serial.Tag.from_orm(tag)
100@http
101def put_tag(session: Session, user: User, tag_id: int, tag_doc: serial.NewTag) -> serial.Tag:
102 '''
103 Update the name and description of an existing tag
105 @permission MANAGE_ORGANISATION
106 '''
107 validate.check(user, perms.MANAGE_ORGANISATION,
108 target_org=user.organisation)
109 tag = session.query(Tag)\
110 .filter(Tag.id == tag_id, Tag.org_id == user.org_id)\
111 .one()
112 tag.name = tag_doc['name']
113 tag.description = tag_doc['description']
114 return serial.Tag.from_orm(tag)
117@http
118def delete_tag(session: Session, user: User, tag_id: int):
119 '''
120 Delete the tag with the given ID and remove all references to that tag
121 from related items.
123 @permission MANAGE_ORGANISATION
124 '''
125 validate.check(user, perms.MANAGE_ORGANISATION,
126 target_org=user.organisation)
127 tag = session.query(Tag)\
128 .filter(Tag.id == tag_id, Tag.org_id == user.org_id)\
129 .one()
130 session.delete(tag)
133def _tag_assign(
134 session: Session,
135 user: User,
136 tag_id: int,
137 tag_assigns_doc: serial.TagAssigns,
138 action_func: Callable):
139 '''
140 Processes a TagAssigns document listing question ids to be assigned/unassigned (linked/unlinked)
141 to the tag with the given ID. Takes care of checking that that question IDs belong to projects
142 visible to the current user.
144 Action to assign or unassign the tag is delegated to the 'action_func' callable.
145 '''
147 validate.check(user, perms.MANAGE_ORGANISATION,
148 target_org=user.organisation)
150 # Check tag exists for org
151 tag = session.query(Tag)\
152 .filter(Tag.id == tag_id, Tag.org_id == user.org_id)\
153 .one()
155 _add_questions_from_sections(session, user, tag_assigns_doc)
157 instance_ids = set(tag_assigns_doc['question_instance_ids'])
158 provided_ids = [i for i in instance_ids]
160 qi_q = session.query(QuestionInstance)\
161 .join(Project, Participant)\
162 .filter(Participant.org_id == user.org_id)\
163 .filter(QuestionInstance.id.in_(instance_ids))
165 action_func(tag, qi_q, instance_ids)
167 if instance_ids:
168 unmatched = ', '.join(str(i) for i in instance_ids)
169 raise ValueError(
170 f'The following Question Instance ids were not found: {unmatched}')
172 return provided_ids
175def _add_questions_from_sections(session: Session, user: User, tag_assigns_doc: serial.TagAssigns):
176 '''
177 Look up question instance ids for the sections IDs provided. Extend the list of provided
178 question_instance_id values in tag_assigns_docs.
179 '''
180 qids = tag_assigns_doc['question_instance_ids']
181 for sec in session.query(Section)\
182 .join(Project, Project.id == Section.project_id)\
183 .join(Participant)\
184 .filter(Participant.org_id == user.org_id)\
185 .filter(Section.id.in_(tag_assigns_doc['section_ids'])):
186 if tag_assigns_doc['recursive']:
187 q = (
188 session.query(QuestionInstance.id)
189 .filter(QuestionInstance.project_id == sec.project_id)
190 .filter(QuestionInstance.number.startswith(sec.number))
191 )
192 else:
193 q = session.query(QuestionInstance.id).filter(
194 QuestionInstance.section == sec)
195 qids.extend(r.id for r in q)
198@http
199def post_tag_assign(session: Session, user: User, tag_id: int, tag_assigns_doc) -> List[int]:
200 '''
201 Assign the Tag with the given ID to the question instances with ID values
202 provided in the TagAssigns document.
204 Duplicate assignments are silently ignored.
206 An array of assigned question IDs is returned.
208 @permission MANAGE_ORGANISATION
209 '''
210 def assign(tag, question_instances, qids_to_assign):
211 for qi in question_instances:
212 qids_to_assign.remove(qi.id)
213 if tag not in qi.tags:
214 qi.tags.append(tag)
216 return _tag_assign(session, user, tag_id, tag_assigns_doc, assign)
219@http
220def delete_tag_assign(session: Session, user: User, tag_id: int, tag_assigns_doc) -> List[int]:
221 '''
222 Un-assign the Tag with the given ID from the question instances with ID values
223 provided in the TagAssigns document.
225 An array of un-assigned question IDs is returned.
227 @permission MANAGE_ORGANISATION
228 '''
229 def un_assign(tag, question_instances, qids_to_assign):
230 for qi in question_instances:
231 qids_to_assign.remove(qi.id)
232 if tag in qi.tags:
233 qi.tags.remove(tag)
235 return _tag_assign(session, user, tag_id, tag_assigns_doc, un_assign)