Coverage for rfpy/jobs/events/webhooks.py: 88%
74 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1import logging
2from datetime import datetime
3import time
5import requests
6from sqlalchemy.orm import sessionmaker
7from sqlalchemy.orm.exc import NoResultFound
9from rfpy.model.notify import WebhookSubscription
10from rfpy.utils import configure_rfpy, build_engine
11from rfpy import conf
13try: # pragma: no cover
14 import uwsgi # type: ignore # noqa: F401
15 import uwsgidecorators
16except ImportError:
17 from rfpy.jobs import uwsgi_stubs as uwsgidecorators
20log = logging.getLogger(__name__)
23@uwsgidecorators.spool
24def retry_ping_webhook(arg_dict):
25 if conf.CONF is None: # pragma: no cover
26 configure_rfpy()
27 engine = build_engine()
28 Session = sessionmaker(engine)
29 session = Session()
30 wh_org_id = arg_dict['webhook_org_id']
31 wh_event_type = arg_dict['webhook_event_type']
32 post_body = arg_dict['event_model_bytes']
33 try:
34 wh = (
35 session.query(WebhookSubscription)
36 .filter_by(org_id=wh_org_id, event_type=wh_event_type)
37 .one()
38 )
39 ping_webhook(wh, post_body)
40 session.commit()
41 except NoResultFound:
42 log.error(
43 "Ping webhook failed - cannot find WebhookSubscription for org %, event %s",
44 wh_org_id, wh_event_type
45 )
46 finally:
47 session.close()
50def ping_webhook(wh: WebhookSubscription, ev_model_bytes: bytes):
51 '''
52 POST a json document of Event details to an external url configured by a user of the system
54 On failure retry until the maximum number of tries defined in WebhookSubscription has been
55 reached
56 '''
57 if wh.is_aborted:
58 log.info('Webhook for %s is already aborted, quitting ping attempt', wh.remote_url)
59 return
60 elif wh.max_tries_exceeded:
61 log.warning('Maximum number of tries exceeded for %s, aborting', wh.remote_url)
62 wh.set_aborted()
63 return
65 headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
66 if wh.http_header:
67 header_name, header_value = tuple(wh.http_header.split(':'))
68 headers[header_name] = header_value
70 def _retry(msg):
71 log.warning(msg)
72 wh.set_failed_attempt(msg)
74 if wh.max_tries_exceeded:
75 log.warning('Maximum number of tries exceeded for %s, aborting', wh.remote_url)
76 wh.set_aborted()
77 return
79 retry_ping_webhook(
80 {
81 'webhook_org_id': wh.org_id.encode('utf8'),
82 'webhook_event_type': wh.event_type.encode('utf8'),
83 'event_model_bytes': ev_model_bytes
84 }
85 )
86 next_try = time.time()
87 next_dt = datetime.fromtimestamp(next_try)
88 log.warning('Scheduling webhook ping retry for url %s at %s', wh.remote_url, next_dt)
90 resp = None
91 try:
92 resp = requests.post(
93 wh.remote_url, ev_model_bytes, timeout=6.1, stream=True, headers=headers
94 )
95 except requests.Timeout:
96 m = f"Webhook ping to {wh.remote_url} timed out"
97 _retry(m)
98 except requests.ConnectionError:
99 m = f"Webhook ping to {wh.remote_url} failed to connect"
100 _retry(m)
102 if resp is not None:
103 if resp.ok:
104 resp_size = 0
105 for chunk in resp.iter_content(512):
106 resp_size += len(chunk)
107 if resp_size > 1024:
108 wh.error_message = f'Response from {wh.remote_url} > 1024 bytes'
109 log.warning('Response body from %s too long, aborting', wh.remote_url)
110 wh.set_aborted()
111 resp.close()
112 break
113 else:
114 log.info('Ping for %s successfully executed', wh.remote_url)
115 wh.set_delivered()
116 else:
117 msg = f"HTTP {resp.status_code}: {resp.text[:90]}"
118 _retry(msg)