Coverage for rfpy/suxint.py: 99%

457 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import re 

2import inspect 

3import logging 

4import importlib 

5import textwrap 

6from itertools import chain 

7from collections import defaultdict 

8from typing import Iterable, Callable, Optional, Sequence 

9from inspect import Parameter, Signature 

10 

11 

12from apispec import APISpec 

13from pydantic import BaseModel 

14 

15from pydantic.json_schema import models_json_schema 

16 

17from rfpy.web.ext.apispec import SuxPlugin 

18from rfpy.web.ext.openapi_types import openapi_type_mapping 

19 

20 

21class RoutingError(AttributeError): 

22 pass 

23 

24 

25HANDLER_PREFIX = re.compile(r"^(get|post|put|delete)_|get") 

26ARG_REGEX = r"/(\d+|:\w+)" 

27 

28log = logging.getLogger(__name__) 

29 

30 

31def _set_exposed(handler): 

32 fname = handler.__name__ 

33 if not HANDLER_PREFIX.match(fname): 

34 raise RoutingError( 

35 f"{fname} is not a valid name for a handler function." 

36 " Must start with get, post, put or delete" 

37 ) 

38 handler.http_exposed = True 

39 

40 

41def http(handler): 

42 """ 

43 Decorator to set exposed=True on a handler callable 

44 """ 

45 _set_exposed(handler) 

46 return handler 

47 

48 

49def http_etag(handler): 

50 """ 

51 Decorator to set attributes exposed=True and etag=True on a handler callable 

52 """ 

53 _set_exposed(handler) 

54 handler.etag = True 

55 return handler 

56 

57 

58class Sux(object): 

59 """ 

60 Sux is a dispatcher. It inspects an http request, finds a corresponding callable, 

61 and invokes it. The motivation is to avoid having a routes table which matches 

62 regular expresions to handler functions. 

63 

64 Handler Matching 

65 Handlers are matched by simply converting slashes in a URL into underscores 

66 

67 GET /this/that/ -> get_this_that 

68 POST /ding/dong/ -> post_ding_dong 

69 

70 A simple heurestic / stupid shortcut is used to match URL path arguments: 

71 anything that looks like an argument is stripped out before the function is matched. 

72 This stripping is done by applying a regular expresion (self.arg_regex). 

73 

74 The default regex is r'/(\\d+|:\\w+)' - i.e. extract digits or strings that 

75 start with a colon from the URL path. 

76 

77 GET /this/23/that/ -> get_this_that(23) 

78 GET /some/:cake -> get_some(cake) 

79 

80 Function arguments and invocation 

81 

82 Sux introspects the function arguments. For each argument, Sux will look for an "adaptor" 

83 function. An adaptor function takes an http request as its argument and extracts a value 

84 from the request. Thus the handler is decoupled from http. 

85 

86 Example 

87 1 http request GET /this/23/that?user_name=bob 

88 

89 2 lookup matching handler get_this_that(this_id, user_name) 

90 

91 3 extract argument names this_id, user_name 

92 

93 4 find corresponding adaptors functions this_id(request), user_name(request) 

94 

95 5 call adaptors and collect return vals [23, 'bob'] 

96 

97 6 invoke handler get_this_that(23, user_name='bob')) 

98 

99 

100 @param handler_module - string module name or actual module containing 

101 handler (end point) functions 

102 

103 @param adaptor_module - string module name or actual module containing adaptor 

104 functions 

105 

106 @params models_module - string name of module providing Pydantic schema models 

107 

108 @param arg_regex - reqular expression (string) defining which url path name_elements 

109 should be treated as arguments. By default any numerical name_elements 

110 are captured by the regex r'\\d+'. So /a/23/b/ would yield 23 

111 

112 @param mount_depth - Sux inspects the path_info element of the URL to resolve the matching 

113 handler method. If sux is mounted at a given path, e.g. '/url/' then 

114 this path element is ignored. The mount_depth argument indicates how many 

115 path arguments should be chopped. Default is 1, 

116 e.g. GET '/api/things/23' > get_api(things_id) 

117 @param api_name - The name of this API for documentation purposes 

118 

119 @param validate - boolean to determine whether Sux should check that the handler module 

120 contains at least one valid handler. Set to False for easier unit testing. 

121 

122 """ 

123 

124 def __init__( 

125 self, 

126 handler_module, 

127 adaptor_module=None, 

128 models_module="rfpy.web.serial", 

129 arg_regex=None, 

130 mount_depth=1, 

131 api_name="PostRFP API", 

132 validate=True, 

133 ): 

134 if arg_regex is None: 

135 arg_regex = ARG_REGEX 

136 self.models_module = models_module 

137 self.arg_regex = re.compile(arg_regex) 

138 self.mount_depth = mount_depth 

139 self.api_name = api_name 

140 self.handler_module = None 

141 self.adaptor_module = None 

142 self.modules = {} # set of modules in package containing valid handlers 

143 

144 self._spec_dict = None 

145 

146 self._handler_cache = {} # cache of all func names to handlers 

147 self._handler_argspec_cache = {} # cache results of inspecting functions 

148 

149 self.load_handler_module(handler_module) 

150 self.load_adaptors(adaptor_module) 

151 self.load_handler_cache() 

152 

153 if validate and not self.is_handler_module_valid(): 

154 raise RoutingError( 

155 "handler module does not contain any valid handler functions" 

156 ) 

157 

158 def is_handler_module_valid(self): 

159 """Check that the handler_module contains at least one valid handler""" 

160 return len(self._handler_cache) > 0 

161 

162 def load_handler_module(self, handler_module): 

163 if inspect.ismodule(handler_module): 

164 self.handler_module = handler_module 

165 elif isinstance(handler_module, str): 

166 self.handler_module = importlib.import_module(handler_module) 

167 else: 

168 raise RoutingError("handler_module must be a module or string module name") 

169 

170 def load_handler_cache(self): 

171 for handler in self.iter_handlers(): 

172 self._handler_cache[handler.name] = handler.func 

173 

174 def load_adaptors(self, adaptor_module): 

175 if adaptor_module is None: 

176 self.adaptor_module = self.handler_module 

177 elif inspect.ismodule(adaptor_module): 

178 self.adaptor_module = adaptor_module 

179 elif isinstance(adaptor_module, str): 

180 self.adaptor_module = importlib.import_module(adaptor_module) 

181 else: 

182 raise RoutingError("adaptor_module must be a module or string module name") 

183 

184 def extract_handler_name(self, request): 

185 """ 

186 Converts an http request path into a method name 

187 

188 - Replaces '/' with '_' characters 

189 - Removes anything that looks like a path argument (according to arg_regex) 

190 

191 e.g /get/project/1/people/ => get_project_people(project_id) 

192 """ 

193 method = request.method.lower() 

194 # strip digits and preceding slashes from the name 

195 bare_path, num_path_args = self.arg_regex.subn("", request.path_info) 

196 handler_suffix = bare_path.strip("/").replace("/", "_") 

197 

198 if not handler_suffix: 

199 # assume a call to the root '/', i.e. GET / maps to def get(): 

200 return (method, num_path_args) 

201 

202 derived_func_name = "%s_%s" % (method, handler_suffix) 

203 

204 return (derived_func_name, num_path_args) 

205 

206 def no_matching_handler(self, handler_name, request): 

207 if request.path_info_peek() == "swagger.json": 

208 if self._spec_dict is None: 

209 path = request.script_name.rstrip("/") + "/" 

210 spec = create_spec(self, path=path) 

211 self._spec_dict = spec.to_dict() 

212 return self._spec_dict 

213 

214 raise RoutingError( 

215 'Handler "%s" not found in module "%s"' 

216 % (handler_name, self.handler_module.__name__) 

217 ) 

218 

219 def args_for_handler(self, handler_func): 

220 """ 

221 Extract the names of the function arguments parameters declared by 

222 the given callable 

223 """ 

224 cache = self._handler_argspec_cache 

225 

226 if handler_func not in cache: 

227 sig = inspect.signature(handler_func) 

228 # skip the first handler_func argument if instance method(allow for self) 

229 skip = 0 if inspect.isfunction(handler_func) else 1 

230 arg_names = [ 

231 p.name 

232 for p in sig.parameters.values() 

233 if p.kind is Parameter.POSITIONAL_OR_KEYWORD 

234 ][skip:] 

235 cache[handler_func] = arg_names 

236 

237 return cache[handler_func] 

238 

239 def __call__(self, original_request): 

240 request = original_request.copy() 

241 for _ in range(self.mount_depth): 

242 request.path_info_pop() 

243 

244 handler_name, num_path_args = self.extract_handler_name(request) 

245 

246 try: 

247 handler = self._handler_cache[handler_name] 

248 except KeyError: 

249 return self.no_matching_handler(handler_name, request) 

250 

251 if not getattr(handler, "http_exposed", False): 

252 raise RoutingError("Handler %s is not http accessible" % handler) 

253 if getattr(handler, "etag", False): 

254 original_request.generate_etag = True 

255 

256 params = [] 

257 

258 for handler_arg in self.args_for_handler(handler): 

259 try: 

260 adaptor = getattr(self.adaptor_module, handler_arg) 

261 except AttributeError: 

262 raise AttributeError( 

263 'Adaptor "%s" not found in module "%s"' 

264 % (handler_arg, self.adaptor_module.__name__) 

265 ) 

266 if num_path_args > 0: 

267 if isinstance(adaptor, PathArg): 

268 num_path_args -= 1 

269 elif num_path_args == 0 and isinstance(adaptor, PathArg): 

270 msg = ( 

271 f'Path argument "{adaptor.arg_name}" not found' 

272 f" for function {handler.__name__}" 

273 ) 

274 log.error(msg) 

275 raise RoutingError(msg) 

276 

277 command = adaptor(request) 

278 params.append(command) 

279 

280 if num_path_args > 0: 

281 # check that get_example_path() doesn't answer to GET /example/path/1/ 

282 # or GET /example/2/path 

283 raise RoutingError( 

284 f'Handler function for Request "{request.path}" ' 

285 "must consume all path arguments" 

286 ) 

287 

288 return handler(*params) 

289 

290 def iter_handler_modules(self): 

291 yield ("__root__", self.handler_module) 

292 for mod_name, mod in inspect.getmembers(self.handler_module, inspect.ismodule): 

293 yield (mod_name, mod) 

294 

295 def iter_handlers(self) -> Iterable["Handler"]: 

296 for mod_name, mod in self.iter_handler_modules(): 

297 for func_name, func in inspect.getmembers(mod, inspect.isfunction): 

298 if HANDLER_PREFIX.match(func_name) and getattr( 

299 func, "http_exposed", False 

300 ): 

301 if mod_name != "__root__" and mod_name not in self.modules: 

302 self.modules[mod_name] = mod 

303 tag = mod_name.capitalize() if mod_name != "__root__" else None 

304 yield Handler(func_name, func, self.adaptor_module, tag=tag) 

305 

306 def handler_by_name(self, handler_name): 

307 for handler in self.iter_handlers(): 

308 if handler.name == handler_name: 

309 return handler 

310 

311 

312class Handler: 

313 """ 

314 Provides a wrapper around a real api endpoint (function or method) 

315 

316 Used primarily for generating documentation 

317 """ 

318 

319 def __init__( 

320 self, name: str, func: Callable, adaptor_mod, tag: Optional[str] = None 

321 ): 

322 self.name = name 

323 self.func = func 

324 self.adaptor_mod = adaptor_mod 

325 self.name_elements = name.split("_") 

326 self._signature = None 

327 self._retval = None 

328 self._docs = DocString(func) 

329 self.tag = tag 

330 

331 @property 

332 def method(self): 

333 """HTTP method, e.g. 'GET' or 'POST""" 

334 return self.name_elements[0].upper() 

335 

336 @property 

337 def path(self): 

338 """ 

339 Show the documentation path for this Handler 

340 

341 This effectively reverses the process that Sux follows - it constructs the 

342 'real' HTTP path by interleave the function name and the arguments to build 

343 a URL. so get_this_that(this_id) becomes /this/{this_id}/that/ 

344 

345 adaptor.name is used to match incoming http request path elements 

346 adaptor_map will look like { 

347 'project': project_id() - an ArgExtractor function which extracts 

348 the project id from the path 

349 'issue': issue_id() 

350 } 

351 """ 

352 adaptor_map = { 

353 adaptor.arg_name: adaptor for arg_name, adaptor in self.path_params() 

354 } 

355 

356 def lookup(match): 

357 path_part = match.groups()[0] 

358 if path_part in adaptor_map: 

359 adaptor = adaptor_map[path_part] 

360 # Interleave the parameter name into the function name derived path 

361 return "/%s/{%s}" % (path_part, adaptor.doc_name) 

362 return "/" + path_part 

363 

364 unparameterised_path = "/" + "/".join(self.name_elements[1:]) 

365 return re.sub(r"/(\w+)", lookup, unparameterised_path) 

366 

367 def path_params(self): 

368 """Generator which yields all PathArg arguments for this handler""" 

369 for arg_name in self.required_arguments: 

370 adaptor = getattr(self.adaptor_mod, arg_name, None) 

371 if adaptor is not None and isinstance(adaptor, PathArg): 

372 yield arg_name, adaptor 

373 

374 def adaptors(self): 

375 for arg_name in chain(self.required_arguments, self.optional_arguments): 

376 adaptor = getattr(self.adaptor_mod, arg_name, None) 

377 yield adaptor 

378 

379 @property 

380 def parameters(self): 

381 """Returns a list in inspect.Parameter objects for this handler's function""" 

382 if getattr(self, "_signature", None) is None: 

383 self._signature = inspect.signature(self.func) 

384 return self._signature.parameters.values() 

385 

386 @property 

387 def required_arguments(self): 

388 """The list of non kwarg argument names for the wrapped func""" 

389 return [p.name for p in self.parameters if p.default is Signature.empty] 

390 

391 @property 

392 def optional_arguments(self): 

393 """A list of optional (keyword) arguments for the wrapped function""" 

394 return [p.name for p in self.parameters if p.default is not Signature.empty] 

395 

396 @property 

397 def docs(self): 

398 info = self._docs.intro 

399 perms = self.permissions 

400 if perms: 

401 info += "\n\nPermissions: " + perms 

402 errs = self.errors 

403 if errs: 

404 info += "\n\nErrors: " + errs 

405 return info 

406 

407 @property 

408 def permissions(self): 

409 if not self._docs.permissions: 

410 return None 

411 return ", ".join(self._docs.permissions) 

412 

413 @property 

414 def errors(self): 

415 if not self._docs.permissions: 

416 return None 

417 return ", ".join(self._docs.errors) 

418 

419 @property 

420 def return_annotation(self): 

421 from typing import get_args 

422 

423 retval = self.return_value 

424 try: 

425 return get_args(retval)[1] 

426 except IndexError: 

427 return None 

428 

429 @property 

430 def return_value(self): 

431 if self._retval is None: 

432 self._retval = inspect.get_annotations(self.func).get("return", None) 

433 return self._retval 

434 

435 def __repr__(self): 

436 return ( 

437 f"<suxint.Handler '{self.name}' Method: {self.method}, Path: {self.path}>" 

438 ) 

439 

440 

441class DocString: 

442 """Extracts information from a function's docstring using @thing notation""" 

443 

444 def __init__(self, func): 

445 if func.__doc__ is not None: 

446 self.docstring = textwrap.dedent(func.__doc__).strip() 

447 else: 

448 self.docstring = "" 

449 self.param_pairs = re.findall(r"@(\w+)\s+(.*)$", self.docstring, re.MULTILINE) 

450 

451 def _key(self, key): 

452 return [v for k, v in self.param_pairs if k == key] 

453 

454 @property 

455 def intro(self): 

456 intro, _at, _after_at = self.docstring.partition("@") 

457 return intro.strip() 

458 

459 @property 

460 def permissions(self): 

461 return self._key("permissions") 

462 

463 @property 

464 def errors(self): 

465 return self._key("raises") 

466 

467 

468truthy_strings = {"true", "1", "yes"} 

469 

470 

471class ArgExtractor(object): 

472 """ 

473 Base class for Argument Extractor classes 

474 

475 Instances of ArgExtractor are callables that extract a value 

476 from an HTTP request 

477 

478 e.g. GetArg('name') produces a callable that knows how to extract the 

479 GET parameter 'name' from an http request 

480 """ 

481 

482 swagger_in: Optional[str] = None 

483 swagger_required = True 

484 

485 def __init__( 

486 self, 

487 arg_name, 

488 validator=None, 

489 doc=None, 

490 default=None, 

491 arg_type="int", 

492 converter=None, 

493 enum_values=None, 

494 required=None, 

495 ) -> None: 

496 self.arg_name = arg_name 

497 self.doc = doc 

498 self.default = default 

499 self.arg_type = arg_type 

500 self.converter = converter 

501 self.required = required 

502 if enum_values is not None and not isinstance(enum_values, (set, list, tuple)): 

503 raise ValueError("enum_values must be a set, list or tuple") 

504 self.enum_values = enum_values 

505 if callable(validator): 

506 self.validator = validator 

507 elif validator is not None: 

508 raise ValueError("validator, if given, must be a callable") 

509 # subclasses might define a validator, don't overwrite it 

510 if not hasattr(self, "validator"): 

511 self.validator = None 

512 

513 def validate(self, value): 

514 if self.validator: 

515 if not self.validator(value): 

516 raise ValueError("%s is not a valid value" % value) 

517 

518 def validate_enums(self, value): 

519 if self.enum_values is not None: 

520 if isinstance(value, (list, set, tuple)): 

521 for v in value: 

522 if v not in self.enum_values: 

523 raise ValueError( 

524 f"Value -{v}- is not one of {self.enum_values}" 

525 ) 

526 elif value not in self.enum_values: 

527 raise ValueError(f"Value -{value}- is not one of {self.enum_values}") 

528 

529 def typed(self, value): 

530 if value is None: 

531 return None 

532 if not isinstance(value, str): 

533 value = str(value) 

534 value = value.strip() 

535 if self.arg_type == "int": 

536 return int(value) 

537 if self.arg_type == "float": 

538 return float(value) 

539 if self.arg_type == "boolean": 

540 return value.lower() in truthy_strings 

541 

542 # lstrip is a workaround to enable strings as path arguments 

543 # to be recognised, e.g. /user/:bob/delete 

544 return value.lstrip(":") 

545 

546 def __call__(self, request): 

547 val = self.extract(request) 

548 self.validate(val) 

549 if self.converter is not None: 

550 return self.converter(val) 

551 return val 

552 

553 def extract(self, request): 

554 raise NotImplementedError 

555 

556 def update_openapi_path_object(self, path_object: dict): 

557 spec = { 

558 "in": self.swagger_in, 

559 "required": self.swagger_required, 

560 "name": self.doc_name, 

561 } 

562 if self.required is not None: 

563 spec["required"] = self.required 

564 

565 if self.doc is not None: 

566 spec["description"] = self.doc 

567 spec["schema"] = dict(type=self.swagger_type) 

568 if self.default is not None: 

569 val = self.default 

570 # if self.arg_type == 'boolean': 

571 # if isinstance(val, str): 

572 # val = val.lower() in truthy_strings 

573 # else: 

574 # val = bool(val) 

575 spec["schema"]["default"] = val 

576 

577 if getattr(self, "enum_values", None) is not None: 

578 if "items" in spec["schema"]: 

579 spec["schema"]["items"]["enum"] = self.enum_values 

580 else: 

581 spec["schema"]["enum"] = self.enum_values 

582 

583 self.augment_openapi_path_object(spec) 

584 

585 path_object["parameters"].append(spec) 

586 

587 def augment_openapi_path_object(self, spec: dict) -> None: 

588 """ 

589 Subclasses can override this method to add additional information to the 

590 OpenAPI path object. 

591 """ 

592 pass 

593 

594 @property 

595 def swagger_type(self): 

596 return openapi_type_mapping[self.arg_type] 

597 

598 @property 

599 def doc_name(self): 

600 """The name of this Parameter for documentation purposes""" 

601 return self.arg_name 

602 

603 

604class PathArg(ArgExtractor): 

605 swagger_in = "path" 

606 

607 def __init__(self, *args, **kwargs): 

608 super(PathArg, self).__init__(*args, **kwargs) 

609 regexp = r".*/%s/([^/]+)/*" % self.arg_name 

610 self.path_regex = re.compile(regexp) 

611 

612 def extract(self, request): 

613 try: 

614 val = self.path_regex.match(request.path_info).groups()[0] 

615 return self.typed(val) 

616 except Exception: 

617 mess = ( 

618 f"{self.__class__.__name__} Adaptor could not extract " 

619 f'value "{self.arg_name}" from path {request.path_info} ' 

620 f"using regex {self.path_regex.pattern}" 

621 ) 

622 raise ValueError(mess) 

623 

624 @property 

625 def doc_name(self): 

626 """The name of this Parameter for documentation purposes""" 

627 return self.arg_name + "_id" 

628 

629 

630class GetArg(ArgExtractor): 

631 swagger_in = "query" 

632 swagger_required = False 

633 

634 def extract(self, request): 

635 if self.arg_name not in request.GET: 

636 if self.required: 

637 raise ValueError(f"Query param '{self.arg_name}'' must be provided") 

638 return self.typed(self.default) 

639 value = request.GET[self.arg_name] 

640 if value.strip() == "": 

641 return self.typed(self.default) 

642 else: 

643 self.validate_enums(value) 

644 return self.typed(value) 

645 

646 

647class GetArgSet(ArgExtractor): 

648 """Extracts a Set (sequence) of argument values for the given GET arg""" 

649 

650 swagger_in = "query" 

651 swagger_type = "array" 

652 

653 def __init__(self, *args, **kwargs): 

654 self.array_items_type = kwargs.pop("array_items_type", "str") 

655 self.min_items = kwargs.pop("min_items", None) 

656 self.max_items = kwargs.pop("max_items", None) 

657 super().__init__(*args, **kwargs) 

658 

659 def extract(self, request): 

660 arg_array_name = f"{self.arg_name}[]" 

661 if self.arg_name in request.GET: 

662 arg_array_name = self.arg_name 

663 values = request.GET.getall(arg_array_name) 

664 if self.min_items is not None and len(values) < self.min_items: 

665 raise ValueError( 

666 f"At least {self.min_items} {self.arg_name} parameters required" 

667 ) 

668 if self.max_items is not None and len(values) > self.max_items: 

669 raise ValueError( 

670 f"No more than {self.max_items} {self.arg_name} parameters permitted" 

671 ) 

672 self.validate_enums(values) 

673 return {self.typed(v) for v in values} 

674 

675 def augment_openapi_path_object(self, spec: dict) -> None: 

676 if self.min_items is not None: 

677 spec["schema"]["minItems"] = self.min_items 

678 

679 if self.max_items is not None: 

680 spec["schema"]["maxItems"] = self.max_items 

681 

682 if self.array_items_type is not None: 

683 spec["schema"] = { 

684 "type": "array", 

685 "items": {"type": openapi_type_mapping[self.array_items_type]}, 

686 } 

687 

688 

689def _get_or_create_schema( 

690 path_object: dict, mime_type="application/x-www-form-urlencoded" 

691): 

692 return ( 

693 path_object.setdefault("requestBody", {}) 

694 .setdefault("content", {}) 

695 .setdefault(mime_type, {}) 

696 .setdefault("schema", {}) 

697 ) 

698 

699 

700class PostArg(ArgExtractor): 

701 swagger_in = "formData" 

702 swagger_required = False 

703 swagger_type = "string" 

704 doc_name = "body" 

705 

706 def extract(self, request): 

707 value = request.POST.get(self.arg_name, self.default) 

708 if value is not None: 

709 tv = self.typed(value) 

710 if self.enum_values and tv not in self.enum_values: 

711 raise ValueError(f"{tv} is not in {self.enum_values}") 

712 return tv 

713 return None 

714 

715 def update_openapi_path_object(self, path_object: dict): 

716 schema = _get_or_create_schema(path_object, mime_type="multipart/form-data") 

717 schema.setdefault("type", "object") 

718 properties = schema.setdefault("properties", {}) 

719 properties[self.arg_name] = {"type": self.swagger_type} 

720 if self.enum_values is not None: 

721 properties[self.arg_name]["enum"] = list(self.enum_values) 

722 

723 

724class PostFileArg(ArgExtractor): 

725 swagger_in = "formData" 

726 swagger_type = "string" 

727 doc_name = "body" 

728 

729 def __init__(self, *args, **kwargs): 

730 kwargs["arg_type"] = "file" 

731 super().__init__(*args, **kwargs) 

732 

733 def extract(self, request): 

734 val = request.POST.get(self.arg_name, None) 

735 if val is None and self.required: 

736 raise ValueError(f"Post Parameter {self.arg_name} must be provided") 

737 return val 

738 

739 def update_openapi_path_object(self, path_object: dict): 

740 schema = _get_or_create_schema(path_object, mime_type="multipart/form-data") 

741 schema.setdefault("type", "object") 

742 properties = schema.setdefault("properties", {}) 

743 properties[self.arg_name] = {"type": "string", "format": "binary"} 

744 

745 

746def downgrade_prop(prop): 

747 if "const" in prop: 

748 prop["enum"] = [prop["const"]] 

749 del prop["const"] 

750 for exclusive, plainProp in [ 

751 ("exclusiveMinimum", "minimum"), 

752 ("exclusiveMaximum", "maximum"), 

753 ]: 

754 if exclusive in prop: 

755 exmin = prop[exclusive] 

756 prop[exclusive] = True 

757 prop[plainProp] = exmin 

758 

759 

760def _downgrade_json_schema(model_spec): 

761 """ 

762 OpenAPI v3 uses an old (0.0.) version of JSON Schema. Pydantic provides a more 

763 recent version. This functions makes the pydantic output compatible with OpenAPI 

764 

765 "title" is used for models and model properties in the pydantic output but opeanAPI doesn't 

766 need it, and it adds verbosity, so deleting it. 

767 """ 

768 model_spec.pop("title", None) 

769 if "properties" not in model_spec: 

770 return 

771 for prop in model_spec["properties"].values(): 

772 prop.pop("title", None) 

773 

774 downgrade_prop(prop) 

775 if "items" in prop: 

776 downgrade_prop(prop["items"]) 

777 

778 

779def servers_object(path): 

780 return [ 

781 {"url": f"https://www.rfpalchemy.com/{path}"}, 

782 {"url": f"http://localhost:9000/{path}"}, 

783 ] 

784 

785 

786def add_model_components(spec, sux_instance: Sux): 

787 if isinstance(sux_instance.models_module, str): 

788 models_mod = importlib.import_module(sux_instance.models_module) 

789 else: 

790 raise ValueError( 

791 "sux_instance models_module attribute must be a string module name" 

792 ) 

793 

794 mods: Sequence = [ 

795 (mod, "validation") 

796 for mod in (getattr(models_mod, m) for m in dir(models_mod)) 

797 if ( 

798 isinstance(mod, type) 

799 and issubclass(mod, BaseModel) 

800 and mod is not BaseModel 

801 ) 

802 ] 

803 

804 try: 

805 # This call to update_forward_refs is required because Nodes has recursive properties 

806 # - horrible hack because I cannot figure out how to determine if a model has been 

807 # created with call to typing.ForwardRef. See pydantic docs 

808 # https://pydantic-docs.helpmanual.io/usage/postponed_annotations/ 

809 getattr(models_mod, "Nodes").model_rebuild() 

810 except AttributeError: 

811 pass 

812 

813 _, fulldefs = models_json_schema(mods, ref_template="#/components/schemas/") 

814 

815 defs = fulldefs["$defs"] 

816 for model_name in defs.keys(): 

817 model_spec = defs[model_name] 

818 _downgrade_json_schema(model_spec) 

819 spec.components.schema(model_name, model_spec) 

820 

821 

822def create_spec(sux_instance: Sux, path="api/") -> APISpec: 

823 from .web.base import API_VERSION 

824 

825 spec = APISpec( 

826 title=sux_instance.api_name, 

827 info=dict(description="API for managing evaluation projects"), 

828 version=str(API_VERSION), 

829 openapi_version="3.0.3", 

830 plugins=[SuxPlugin()], 

831 servers=servers_object(path), 

832 ) 

833 

834 path_map = defaultdict(list) 

835 

836 for handler in sux_instance.iter_handlers(): 

837 path_map[handler.path].append(handler) 

838 

839 for url_key in sorted(path_map.keys()): 

840 handlers = path_map[url_key] 

841 sux_item = (url_key, handlers) 

842 spec.path(path=url_key, sux_handler=sux_item) 

843 

844 add_model_components(spec, sux_instance) 

845 

846 for mod_name, mod in sux_instance.modules.items(): 

847 tag_dict = {"name": mod_name.capitalize()} 

848 if mod.__doc__: 

849 tag_dict["description"] = textwrap.dedent(mod.__doc__).strip() 

850 

851 spec.tag(tag_dict) 

852 

853 return spec