Coverage for rfpy/api/endpoints/tags.py: 82%

85 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1''' 

2Manage tags - keywords that can be used to categorize questions 

3''' 

4from rfpy.api import fetch 

5from typing import Callable, List 

6 

7from sqlalchemy import func, null 

8from sqlalchemy.orm import Session 

9from sqlalchemy.orm.exc import NoResultFound 

10 

11from rfpy.api import validate 

12from rfpy.auth import perms 

13from rfpy.suxint import http 

14from rfpy.model import User, Organisation, Project, Participant, Section, QuestionInstance 

15from rfpy.model.tags import tags_qinstances_table, Tag 

16from rfpy.model.exc import DuplicateDataProvided 

17from rfpy.web import serial 

18 

19 

20@http 

21def get_tags(session: Session, user: User) -> List[serial.Tag]: 

22 ''' 

23 Get an array of all the Tags defined by your organization 

24 ''' 

25 q = session.query(Tag)\ 

26 .join(Organisation)\ 

27 .filter(Organisation.id == user.org_id) 

28 

29 return [serial.Tag.from_orm(e) for e in q] 

30 

31 

32@http 

33def get_tags_section(session: Session, user: User, section_id: int) -> List[serial.TagGroup]: 

34 ''' 

35 Get an array of objects, one for each tag, listing the question ids assigned to each 

36 tag for the given section_id. 

37 

38 N.B. - this operation is not recursive only questions that are direct children of the given 

39 section ID are considered 

40 ''' 

41 project_id = session.query(Section.project_id).filter( 

42 Section.id == section_id).scalar() 

43 if project_id is None: 

44 raise NoResultFound(f'Section ID {section_id} not found') 

45 project = fetch.project(session, project_id) 

46 

47 sec = session.query(Section).get(section_id) 

48 if sec is None: 

49 raise NoResultFound(f"Section with ID {section_id} not found") 

50 

51 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE, 

52 project=project, section_id=section_id) 

53 

54 subsections = fetch.get_subsections_recursive( 

55 session, section_id).subquery() 

56 q = ( 

57 session.query( 

58 Tag.id.label('tag_id'), 

59 func.GROUP_CONCAT(func.IF(subsections.c.parent_lvl_1 == 

60 None, QuestionInstance.id, null())).label('qids'), 

61 func.GROUP_CONCAT(subsections.c.parent_lvl_1).label('section_ids'), 

62 ) 

63 .select_from(Tag) 

64 .join(tags_qinstances_table) 

65 .join(QuestionInstance) 

66 .join(subsections) 

67 .group_by(Tag.id) 

68 ) 

69 return [ 

70 { 

71 'tag_id': r.tag_id, 

72 'question_instance_ids': [int(qid) for qid in r.qids.split(',')] if r.qids else [], 

73 'section_ids': [int(sid) for sid in r.section_ids.split(',')] if r.section_ids else [] 

74 } 

75 for r in q 

76 ] 

77 

78 

79@http 

80def post_tag(session: Session, user: User, tag_doc: serial.NewTag) -> serial.Tag: 

81 ''' 

82 Create a new Tag for your organization 

83 

84 @permission MANAGE_ORGANISATION 

85 ''' 

86 validate.check(user, perms.MANAGE_ORGANISATION, 

87 target_org=user.organisation) 

88 tag = Tag(**tag_doc) 

89 if session.query(Tag)\ 

90 .filter(Tag.name == tag.name)\ 

91 .filter(Tag.organisation == user.organisation)\ 

92 .all(): 

93 raise DuplicateDataProvided(f"tag '{tag.name}' already exists") 

94 

95 user.organisation.tags.append(tag) 

96 session.flush() 

97 return serial.Tag.from_orm(tag) 

98 

99 

100@http 

101def put_tag(session: Session, user: User, tag_id: int, tag_doc: serial.NewTag) -> serial.Tag: 

102 ''' 

103 Update the name and description of an existing tag 

104 

105 @permission MANAGE_ORGANISATION 

106 ''' 

107 validate.check(user, perms.MANAGE_ORGANISATION, 

108 target_org=user.organisation) 

109 tag = session.query(Tag)\ 

110 .filter(Tag.id == tag_id, Tag.org_id == user.org_id)\ 

111 .one() 

112 tag.name = tag_doc['name'] 

113 tag.description = tag_doc['description'] 

114 return serial.Tag.from_orm(tag) 

115 

116 

117@http 

118def delete_tag(session: Session, user: User, tag_id: int): 

119 ''' 

120 Delete the tag with the given ID and remove all references to that tag 

121 from related items. 

122 

123 @permission MANAGE_ORGANISATION 

124 ''' 

125 validate.check(user, perms.MANAGE_ORGANISATION, 

126 target_org=user.organisation) 

127 tag = session.query(Tag)\ 

128 .filter(Tag.id == tag_id, Tag.org_id == user.org_id)\ 

129 .one() 

130 session.delete(tag) 

131 

132 

133def _tag_assign( 

134 session: Session, 

135 user: User, 

136 tag_id: int, 

137 tag_assigns_doc: serial.TagAssigns, 

138 action_func: Callable): 

139 ''' 

140 Processes a TagAssigns document listing question ids to be assigned/unassigned (linked/unlinked) 

141 to the tag with the given ID. Takes care of checking that that question IDs belong to projects 

142 visible to the current user. 

143 

144 Action to assign or unassign the tag is delegated to the 'action_func' callable. 

145 ''' 

146 

147 validate.check(user, perms.MANAGE_ORGANISATION, 

148 target_org=user.organisation) 

149 

150 # Check tag exists for org 

151 tag = session.query(Tag)\ 

152 .filter(Tag.id == tag_id, Tag.org_id == user.org_id)\ 

153 .one() 

154 

155 _add_questions_from_sections(session, user, tag_assigns_doc) 

156 

157 instance_ids = set(tag_assigns_doc['question_instance_ids']) 

158 provided_ids = [i for i in instance_ids] 

159 

160 qi_q = session.query(QuestionInstance)\ 

161 .join(Project, Participant)\ 

162 .filter(Participant.org_id == user.org_id)\ 

163 .filter(QuestionInstance.id.in_(instance_ids)) 

164 

165 action_func(tag, qi_q, instance_ids) 

166 

167 if instance_ids: 

168 unmatched = ', '.join(str(i) for i in instance_ids) 

169 raise ValueError( 

170 f'The following Question Instance ids were not found: {unmatched}') 

171 

172 return provided_ids 

173 

174 

175def _add_questions_from_sections(session: Session, user: User, tag_assigns_doc: serial.TagAssigns): 

176 ''' 

177 Look up question instance ids for the sections IDs provided. Extend the list of provided 

178 question_instance_id values in tag_assigns_docs. 

179 ''' 

180 qids = tag_assigns_doc['question_instance_ids'] 

181 for sec in session.query(Section)\ 

182 .join(Project, Project.id == Section.project_id)\ 

183 .join(Participant)\ 

184 .filter(Participant.org_id == user.org_id)\ 

185 .filter(Section.id.in_(tag_assigns_doc['section_ids'])): 

186 if tag_assigns_doc['recursive']: 

187 q = ( 

188 session.query(QuestionInstance.id) 

189 .filter(QuestionInstance.project_id == sec.project_id) 

190 .filter(QuestionInstance.number.startswith(sec.number)) 

191 ) 

192 else: 

193 q = session.query(QuestionInstance.id).filter( 

194 QuestionInstance.section == sec) 

195 qids.extend(r.id for r in q) 

196 

197 

198@http 

199def post_tag_assign(session: Session, user: User, tag_id: int, tag_assigns_doc) -> List[int]: 

200 ''' 

201 Assign the Tag with the given ID to the question instances with ID values 

202 provided in the TagAssigns document. 

203 

204 Duplicate assignments are silently ignored. 

205 

206 An array of assigned question IDs is returned. 

207 

208 @permission MANAGE_ORGANISATION 

209 ''' 

210 def assign(tag, question_instances, qids_to_assign): 

211 for qi in question_instances: 

212 qids_to_assign.remove(qi.id) 

213 if tag not in qi.tags: 

214 qi.tags.append(tag) 

215 

216 return _tag_assign(session, user, tag_id, tag_assigns_doc, assign) 

217 

218 

219@http 

220def delete_tag_assign(session: Session, user: User, tag_id: int, tag_assigns_doc) -> List[int]: 

221 ''' 

222 Un-assign the Tag with the given ID from the question instances with ID values 

223 provided in the TagAssigns document. 

224 

225 An array of un-assigned question IDs is returned. 

226 

227 @permission MANAGE_ORGANISATION 

228 ''' 

229 def un_assign(tag, question_instances, qids_to_assign): 

230 for qi in question_instances: 

231 qids_to_assign.remove(qi.id) 

232 if tag in qi.tags: 

233 qi.tags.remove(tag) 

234 

235 return _tag_assign(session, user, tag_id, tag_assigns_doc, un_assign)