Coverage for rfpy/jobs/events/webhooks.py: 98%
55 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import logging
3import requests
4from retry_requests import retry # type: ignore[import]
5from sqlalchemy import select
6from sqlalchemy.orm import sessionmaker
7from sqlalchemy.orm.exc import NoResultFound
9from rfpy.model.notify import (
10 WebhookSubscription,
11 DeliveryStatus,
12) # Add DeliveryStatus import
13from rfpy.utils import configure_rfpy, build_engine
14from rfpy import conf
16try: # pragma: no cover
17 import uwsgi # type: ignore # noqa: F401
18 import uwsgidecorators # type: ignore[import]
19except ImportError:
20 from rfpy.jobs import uwsgi_stubs as uwsgidecorators
23log = logging.getLogger(__name__)
26@uwsgidecorators.spool
27def retry_ping_webhook(arg_dict):
28 if conf.CONF is None: # pragma: no cover
29 configure_rfpy()
30 engine = build_engine()
31 Session = sessionmaker(engine, future=True)
32 session = Session()
33 wh_org_id = arg_dict["webhook_org_id"]
34 wh_event_type = arg_dict["webhook_event_type"]
35 post_body = arg_dict["event_model_bytes"]
36 stmt = select(WebhookSubscription).where(
37 WebhookSubscription.org_id == wh_org_id,
38 WebhookSubscription.event_type == wh_event_type,
39 )
40 try:
41 wh = session.execute(stmt).scalars().one()
42 if wh.delivery_status != DeliveryStatus.aborted:
43 ping_webhook(wh, post_body)
44 session.commit()
45 except NoResultFound:
46 log.error(
47 "Ping webhook failed - cannot find WebhookSubscription for org %s, event %s",
48 wh_org_id,
49 wh_event_type,
50 )
51 finally:
52 session.close()
55def ping_webhook(wh: WebhookSubscription, ev_model_bytes: bytes):
56 """
57 POST a json document of Event details to an external url configured by a user of the system
59 On failure retry until the maximum number of tries defined in WebhookSubscription has been
60 reached
61 """
62 headers = {"Content-type": "application/json", "Accept": "text/plain"}
63 if wh.http_header:
64 header_name, header_value = tuple(wh.http_header.split(":"))
65 headers[header_name] = header_value
66 try:
67 http_session = retry(
68 requests.Session(), retries=wh.MAX_ATTEMPTS, backoff_factor=0.5
69 )
70 with http_session.post(
71 wh.remote_url, ev_model_bytes, timeout=3, headers=headers, stream=True
72 ) as resp:
73 buff = ""
74 for chunk in resp.iter_content(chunk_size=1024):
75 if chunk is None:
76 break
77 buff += chunk.decode("utf-8")
78 if len(buff) > 1024 * 10:
79 log.warning("Webhook response too large, truncating")
80 resp.close()
81 break
82 if resp.status_code == 200:
83 wh.set_delivered()
84 else:
85 msg = buff[:99] if buff else "No response text"
86 wh.set_failed_attempt(f"HTTP status code {resp.status_code}, {msg}")
87 resp.raise_for_status()
88 except Exception as e:
89 log.exception("Error sending webhook to %s", wh.remote_url)
90 wh.delivery_status = DeliveryStatus.failing # Ensure status is updated
91 wh.error_message = str(e)