Coverage for rfpy/api/endpoints/tags.py: 96%

85 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1""" 

2Manage tags - keywords that can be used to categorize questions 

3""" 

4 

5from rfpy.api import fetch 

6from typing import Callable, List 

7 

8from sqlalchemy import func, null 

9from sqlalchemy.orm import Session 

10from sqlalchemy.orm.exc import NoResultFound 

11 

12from rfpy.api import validate 

13from rfpy.auth import perms 

14from rfpy.suxint import http 

15from rfpy.model import ( 

16 User, 

17 Organisation, 

18 Project, 

19 Participant, 

20 Section, 

21 QuestionInstance, 

22) 

23from rfpy.model.tags import tags_qinstances_table, Tag 

24from rfpy.model.exc import DuplicateDataProvided 

25from rfpy.web import serial 

26 

27 

28@http 

29def get_tags(session: Session, user: User) -> List[serial.Tag]: 

30 """ 

31 Get an array of all the Tags defined by your organization 

32 """ 

33 q = session.query(Tag).join(Organisation).filter(Organisation.id == user.org_id) 

34 

35 return [serial.Tag.model_validate(e) for e in q] 

36 

37 

38@http 

39def get_tags_section( 

40 session: Session, user: User, section_id: int 

41) -> List[serial.TagGroup]: 

42 """ 

43 Get an array of objects, one for each tag, listing the question ids assigned to each 

44 tag for the given section_id. 

45 

46 N.B. - this operation is not recursive only questions that are direct children of the given 

47 section ID are considered 

48 """ 

49 project_id = ( 

50 session.query(Section.project_id).filter(Section.id == section_id).scalar() 

51 ) 

52 if project_id is None: 

53 raise NoResultFound(f"Section ID {section_id} not found") 

54 project = fetch.project(session, project_id) 

55 

56 sec = session.get(Section, section_id) 

57 if sec is None: 

58 raise NoResultFound(f"Section with ID {section_id} not found") 

59 

60 validate.check( 

61 user, perms.PROJECT_VIEW_QUESTIONNAIRE, project=project, section_id=section_id 

62 ) 

63 

64 subsections = fetch.get_subsections_recursive(session, section_id).subquery() 

65 q = ( 

66 session.query( 

67 Tag.id.label("tag_id"), 

68 func.GROUP_CONCAT( 

69 func.IF(subsections.c.parent_lvl_1 == None, QuestionInstance.id, null()) # noqa: E711 

70 ).label("qids"), # noqa: E711 

71 func.GROUP_CONCAT(subsections.c.parent_lvl_1).label("section_ids"), 

72 ) 

73 .select_from(Tag) 

74 .join(tags_qinstances_table) 

75 .join(QuestionInstance) 

76 .join(subsections) 

77 .group_by(Tag.id) 

78 ) 

79 return [ 

80 serial.TagGroup( 

81 tag_id=r.tag_id, 

82 question_instance_ids=[int(qid) for qid in r.qids.split(",")] 

83 if r.qids 

84 else [], 

85 section_ids=[int(sid) for sid in r.section_ids.split(",")] 

86 if r.section_ids 

87 else [], 

88 ) 

89 for r in q 

90 ] 

91 

92 

93@http 

94def post_tag(session: Session, user: User, tag_doc: serial.NewTag) -> serial.Tag: 

95 """ 

96 Create a new Tag for your organization 

97 

98 @permission MANAGE_ORGANISATION 

99 """ 

100 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

101 tag = Tag(name=tag_doc.name, description=tag_doc.description) 

102 if ( 

103 session.query(Tag) 

104 .filter(Tag.name == tag.name) 

105 .filter(Tag.organisation == user.organisation) 

106 .all() 

107 ): 

108 raise DuplicateDataProvided(f"tag '{tag.name}' already exists") 

109 

110 user.organisation.tags.append(tag) 

111 session.flush() 

112 return serial.Tag.model_validate(tag) 

113 

114 

115@http 

116def put_tag( 

117 session: Session, user: User, tag_id: int, tag_doc: serial.NewTag 

118) -> serial.Tag: 

119 """ 

120 Update the name and description of an existing tag 

121 

122 @permission MANAGE_ORGANISATION 

123 """ 

124 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

125 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one() 

126 tag.name = tag_doc.name 

127 tag.description = tag_doc.description 

128 return serial.Tag.model_validate(tag) 

129 

130 

131@http 

132def delete_tag(session: Session, user: User, tag_id: int): 

133 """ 

134 Delete the tag with the given ID and remove all references to that tag 

135 from related items. 

136 

137 @permission MANAGE_ORGANISATION 

138 """ 

139 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

140 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one() 

141 session.delete(tag) 

142 

143 

144def _tag_assign( 

145 session: Session, 

146 user: User, 

147 tag_id: int, 

148 tag_assigns_doc: serial.TagAssigns, 

149 action_func: Callable, 

150): 

151 """ 

152 Processes a TagAssigns document listing question ids to be assigned/unassigned (linked/unlinked) 

153 to the tag with the given ID. Takes care of checking that that question IDs belong to projects 

154 visible to the current user. 

155 

156 Action to assign or unassign the tag is delegated to the 'action_func' callable. 

157 """ 

158 

159 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

160 

161 # Check tag exists for org 

162 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one() 

163 

164 _add_questions_from_sections(session, user, tag_assigns_doc) 

165 

166 instance_ids = set(tag_assigns_doc.question_instance_ids) 

167 provided_ids = [i for i in instance_ids] 

168 

169 qi_q = ( 

170 session.query(QuestionInstance) 

171 .select_from(QuestionInstance) 

172 .join(Participant, Participant.org_id == user.org_id) 

173 .join(Project, Project.id == QuestionInstance.project_id) 

174 .filter(QuestionInstance.id.in_(instance_ids)) 

175 ) 

176 

177 action_func(tag, qi_q, instance_ids) 

178 

179 if instance_ids: 

180 unmatched = ", ".join(str(i) for i in instance_ids) 

181 raise ValueError( 

182 f"The following Question Instance ids were not found: {unmatched}" 

183 ) 

184 

185 return provided_ids 

186 

187 

188def _add_questions_from_sections( 

189 session: Session, user: User, tag_assigns_doc: serial.TagAssigns 

190): 

191 """ 

192 Look up question instance ids for the sections IDs provided. Extend the list of provided 

193 question_instance_id values in tag_assigns_docs. 

194 """ 

195 qids = tag_assigns_doc.question_instance_ids 

196 for sec in ( 

197 session.query(Section) 

198 .join(Project, Project.id == Section.project_id) 

199 .join(Participant) 

200 .filter(Participant.org_id == user.org_id) 

201 .filter(Section.id.in_(tag_assigns_doc.section_ids)) 

202 ): 

203 if tag_assigns_doc.recursive: 

204 q = ( 

205 session.query(QuestionInstance.id) 

206 .filter(QuestionInstance.project_id == sec.project_id) 

207 .filter(QuestionInstance.number.startswith(sec.number)) 

208 ) 

209 else: 

210 q = session.query(QuestionInstance.id).filter( 

211 QuestionInstance.section == sec 

212 ) 

213 qids.extend(r.id for r in q) 

214 

215 

216@http 

217def post_tag_assign( 

218 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns 

219) -> List[int]: 

220 """ 

221 Assign the Tag with the given ID to the question instances with ID values 

222 provided in the TagAssigns document. 

223 

224 Duplicate assignments are silently ignored. 

225 

226 An array of assigned question IDs is returned. 

227 

228 @permission MANAGE_ORGANISATION 

229 """ 

230 

231 def assign(tag, question_instances, qids_to_assign): 

232 for qi in question_instances: 

233 qids_to_assign.remove(qi.id) 

234 if tag not in qi.tags: 

235 qi.tags.append(tag) 

236 

237 return _tag_assign(session, user, tag_id, tag_assigns_doc, assign) 

238 

239 

240@http 

241def delete_tag_assign( 

242 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns 

243) -> List[int]: 

244 """ 

245 Un-assign the Tag with the given ID from the question instances with ID values 

246 provided in the TagAssigns document. 

247 

248 An array of un-assigned question IDs is returned. 

249 

250 @permission MANAGE_ORGANISATION 

251 """ 

252 

253 def un_assign(tag, question_instances, qids_to_assign): 

254 for qi in question_instances: 

255 qids_to_assign.remove(qi.id) 

256 if tag in qi.tags: 

257 qi.tags.remove(tag) 

258 

259 return _tag_assign(session, user, tag_id, tag_assigns_doc, un_assign)