Coverage for rfpy/jobs/events/webhooks.py: 88%

74 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1import logging 

2from datetime import datetime 

3import time 

4 

5import requests 

6from sqlalchemy.orm import sessionmaker 

7from sqlalchemy.orm.exc import NoResultFound 

8 

9from rfpy.model.notify import WebhookSubscription 

10from rfpy.utils import configure_rfpy, build_engine 

11from rfpy import conf 

12 

13try: # pragma: no cover 

14 import uwsgi # type: ignore # noqa: F401 

15 import uwsgidecorators 

16except ImportError: 

17 from rfpy.jobs import uwsgi_stubs as uwsgidecorators 

18 

19 

20log = logging.getLogger(__name__) 

21 

22 

23@uwsgidecorators.spool 

24def retry_ping_webhook(arg_dict): 

25 if conf.CONF is None: # pragma: no cover 

26 configure_rfpy() 

27 engine = build_engine() 

28 Session = sessionmaker(engine) 

29 session = Session() 

30 wh_org_id = arg_dict['webhook_org_id'] 

31 wh_event_type = arg_dict['webhook_event_type'] 

32 post_body = arg_dict['event_model_bytes'] 

33 try: 

34 wh = ( 

35 session.query(WebhookSubscription) 

36 .filter_by(org_id=wh_org_id, event_type=wh_event_type) 

37 .one() 

38 ) 

39 ping_webhook(wh, post_body) 

40 session.commit() 

41 except NoResultFound: 

42 log.error( 

43 "Ping webhook failed - cannot find WebhookSubscription for org %, event %s", 

44 wh_org_id, wh_event_type 

45 ) 

46 finally: 

47 session.close() 

48 

49 

50def ping_webhook(wh: WebhookSubscription, ev_model_bytes: bytes): 

51 ''' 

52 POST a json document of Event details to an external url configured by a user of the system 

53 

54 On failure retry until the maximum number of tries defined in WebhookSubscription has been 

55 reached 

56 ''' 

57 if wh.is_aborted: 

58 log.info('Webhook for %s is already aborted, quitting ping attempt', wh.remote_url) 

59 return 

60 elif wh.max_tries_exceeded: 

61 log.warning('Maximum number of tries exceeded for %s, aborting', wh.remote_url) 

62 wh.set_aborted() 

63 return 

64 

65 headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} 

66 if wh.http_header: 

67 header_name, header_value = tuple(wh.http_header.split(':')) 

68 headers[header_name] = header_value 

69 

70 def _retry(msg): 

71 log.warning(msg) 

72 wh.set_failed_attempt(msg) 

73 

74 if wh.max_tries_exceeded: 

75 log.warning('Maximum number of tries exceeded for %s, aborting', wh.remote_url) 

76 wh.set_aborted() 

77 return 

78 

79 retry_ping_webhook( 

80 { 

81 'webhook_org_id': wh.org_id.encode('utf8'), 

82 'webhook_event_type': wh.event_type.encode('utf8'), 

83 'event_model_bytes': ev_model_bytes 

84 } 

85 ) 

86 next_try = time.time() 

87 next_dt = datetime.fromtimestamp(next_try) 

88 log.warning('Scheduling webhook ping retry for url %s at %s', wh.remote_url, next_dt) 

89 

90 resp = None 

91 try: 

92 resp = requests.post( 

93 wh.remote_url, ev_model_bytes, timeout=6.1, stream=True, headers=headers 

94 ) 

95 except requests.Timeout: 

96 m = f"Webhook ping to {wh.remote_url} timed out" 

97 _retry(m) 

98 except requests.ConnectionError: 

99 m = f"Webhook ping to {wh.remote_url} failed to connect" 

100 _retry(m) 

101 

102 if resp is not None: 

103 if resp.ok: 

104 resp_size = 0 

105 for chunk in resp.iter_content(512): 

106 resp_size += len(chunk) 

107 if resp_size > 1024: 

108 wh.error_message = f'Response from {wh.remote_url} > 1024 bytes' 

109 log.warning('Response body from %s too long, aborting', wh.remote_url) 

110 wh.set_aborted() 

111 resp.close() 

112 break 

113 else: 

114 log.info('Ping for %s successfully executed', wh.remote_url) 

115 wh.set_delivered() 

116 else: 

117 msg = f"HTTP {resp.status_code}: {resp.text[:90]}" 

118 _retry(msg)