Coverage for rfpy/jobs/events/webhooks.py: 98%

55 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import logging 

2 

3import requests 

4from retry_requests import retry # type: ignore[import] 

5from sqlalchemy import select 

6from sqlalchemy.orm import sessionmaker 

7from sqlalchemy.orm.exc import NoResultFound 

8 

9from rfpy.model.notify import ( 

10 WebhookSubscription, 

11 DeliveryStatus, 

12) # Add DeliveryStatus import 

13from rfpy.utils import configure_rfpy, build_engine 

14from rfpy import conf 

15 

16try: # pragma: no cover 

17 import uwsgi # type: ignore # noqa: F401 

18 import uwsgidecorators # type: ignore[import] 

19except ImportError: 

20 from rfpy.jobs import uwsgi_stubs as uwsgidecorators 

21 

22 

23log = logging.getLogger(__name__) 

24 

25 

26@uwsgidecorators.spool 

27def retry_ping_webhook(arg_dict): 

28 if conf.CONF is None: # pragma: no cover 

29 configure_rfpy() 

30 engine = build_engine() 

31 Session = sessionmaker(engine, future=True) 

32 session = Session() 

33 wh_org_id = arg_dict["webhook_org_id"] 

34 wh_event_type = arg_dict["webhook_event_type"] 

35 post_body = arg_dict["event_model_bytes"] 

36 stmt = select(WebhookSubscription).where( 

37 WebhookSubscription.org_id == wh_org_id, 

38 WebhookSubscription.event_type == wh_event_type, 

39 ) 

40 try: 

41 wh = session.execute(stmt).scalars().one() 

42 if wh.delivery_status != DeliveryStatus.aborted: 

43 ping_webhook(wh, post_body) 

44 session.commit() 

45 except NoResultFound: 

46 log.error( 

47 "Ping webhook failed - cannot find WebhookSubscription for org %s, event %s", 

48 wh_org_id, 

49 wh_event_type, 

50 ) 

51 finally: 

52 session.close() 

53 

54 

55def ping_webhook(wh: WebhookSubscription, ev_model_bytes: bytes): 

56 """ 

57 POST a json document of Event details to an external url configured by a user of the system 

58 

59 On failure retry until the maximum number of tries defined in WebhookSubscription has been 

60 reached 

61 """ 

62 headers = {"Content-type": "application/json", "Accept": "text/plain"} 

63 if wh.http_header: 

64 header_name, header_value = tuple(wh.http_header.split(":")) 

65 headers[header_name] = header_value 

66 try: 

67 http_session = retry( 

68 requests.Session(), retries=wh.MAX_ATTEMPTS, backoff_factor=0.5 

69 ) 

70 with http_session.post( 

71 wh.remote_url, ev_model_bytes, timeout=3, headers=headers, stream=True 

72 ) as resp: 

73 buff = "" 

74 for chunk in resp.iter_content(chunk_size=1024): 

75 if chunk is None: 

76 break 

77 buff += chunk.decode("utf-8") 

78 if len(buff) > 1024 * 10: 

79 log.warning("Webhook response too large, truncating") 

80 resp.close() 

81 break 

82 if resp.status_code == 200: 

83 wh.set_delivered() 

84 else: 

85 msg = buff[:99] if buff else "No response text" 

86 wh.set_failed_attempt(f"HTTP status code {resp.status_code}, {msg}") 

87 resp.raise_for_status() 

88 except Exception as e: 

89 log.exception("Error sending webhook to %s", wh.remote_url) 

90 wh.delivery_status = DeliveryStatus.failing # Ensure status is updated 

91 wh.error_message = str(e)