Coverage for rfpy/api/endpoints/webhooks.py: 96%

52 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1''' 

2Manage Webhook Subscriptions - register external URLs to receive notification of system events. 

3''' 

4from typing import List 

5 

6from rfpy.auth import perms 

7from rfpy.web import serial 

8from rfpy.suxint import http 

9from rfpy.model.notify import WebhookSubscription 

10from rfpy.model.humans import Organisation 

11 

12from .. import fetch, validate 

13 

14 

15@http 

16def post_webhook(session, user, webhook_doc: serial.NewWebhook): 

17 ''' 

18 Save a new Webhook subscribing to the event_type given in the body JSON document. 

19 

20 The will sent a POST request to the provided `url` with a JSON encoded body for each event 

21 of the given `event_type`. 

22 ''' 

23 target_org = user.organisation 

24 if webhook_doc.org_id != user.org_id: 

25 target_org = fetch.organisation(session, webhook_doc.org_id) 

26 validate.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

27 whs = WebhookSubscription(**webhook_doc.dict()) 

28 session.add(whs) 

29 

30 

31@http 

32def put_webhook(session, user, webhook_doc: serial.NewWebhook): 

33 ''' 

34 Update the `remote_url` or `http_header` for the Webhook with the given event_type / org_id. 

35 

36 delivery_status and retries are reset to default values 

37 ''' 

38 target_org = user.organisation 

39 if webhook_doc.org_id != user.org_id: 

40 target_org = fetch.organisation(session, webhook_doc.org_id) 

41 validate.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

42 whd = webhook_doc 

43 wh: WebhookSubscription = session.query(WebhookSubscription).get((whd.org_id, whd.event_type)) 

44 if wh is None: 

45 raise ValueError(f"No Subscription found for org {whd.org_id} and {whd.event_type}") 

46 wh.remote_url = webhook_doc.remote_url 

47 wh.http_header = webhook_doc.http_header 

48 wh.reset_status() 

49 

50 

51@http 

52def get_webhooks(session, user, q_org_id) -> List[serial.Webhook]: 

53 ''' 

54 Fetch an array of Webhook objects for the current users' organisation, or, if given, the 

55 organisation indicated by query param `orgId`. 

56 ''' 

57 target_org = user.organisation 

58 if q_org_id is not None: 

59 target_org = fetch.organisation(session, q_org_id) 

60 validate.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

61 return [serial.Webhook.from_orm(wh) for wh in target_org.webhook_subscriptions] 

62 

63 

64@http 

65def delete_webhook(session, user, webhook_doc: serial.NewWebhook): 

66 ''' 

67 Delete the webhook for the given `event_type` - `org_id` combination 

68 ''' 

69 target_org = user.organisation 

70 if webhook_doc.org_id != user.org_id: 

71 target_org = session.query(Organisation).get(webhook_doc.org_id) 

72 if target_org is None: 

73 raise ValueError(f'Organisation {webhook_doc.org_id} not found') 

74 validate.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

75 whd = webhook_doc 

76 whs = session.query(WebhookSubscription).get((whd.org_id, whd.event_type)) 

77 if whs is None: 

78 raise ValueError(f"No Subscription found for org {whd.org_id} and {whd.event_type}") 

79 session.delete(whs) 

80 

81 

82@http 

83def get_webhook_events(session, user) -> List[str]: 

84 ''' 

85 Fetch an array of event types which accept webhook subscriptions 

86 ''' 

87 from rfpy.jobs.events.action import webhook_evt_types 

88 return webhook_evt_types()