Coverage for rfpy/jobs/events/fanout.py: 100%

43 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from typing import Optional 

2 

3import orjson 

4import webob 

5import webob.dec 

6import webob.exc 

7import requests 

8from requests.auth import HTTPBasicAuth 

9 

10from rfpy import conf 

11from rfpy.utils import json_default 

12from rfpy.web.request import HttpRequest 

13from rfpy.model.humans import User 

14from rfpy.buyer.webapp import BuyerApp 

15 

16DEFAULT_FANOUT_CHANNEL = "update-channel" 

17 

18 

19@BuyerApp.route("subscribe-events") 

20def subscribe(request: HttpRequest) -> webob.Response: 

21 """ 

22 Handle an EventSource subscription request from the Fanout.io api 

23 See https://docs.fanout.io/docs/subscribing 

24 """ 

25 user: User = request.user 

26 if user is not None: 

27 channel = user.org_id 

28 else: 

29 channel = DEFAULT_FANOUT_CHANNEL 

30 if request.method.upper() == "GET": 

31 response = webob.Response("", content_type="text/event-stream") 

32 response.headers["Cache-Control"] = "no-cache" 

33 response.headers["Grip-Hold"] = "stream" 

34 response.headers["Grip-Channel"] = channel 

35 response.headers["Grip-Keep-Alive"] = ":\\n\\n; format=cstring; timeout=20" 

36 return response 

37 

38 return webob.exc.HTTPMethodNotAllowed("Only GET requests supported") 

39 

40 

41def fanout_envelope(): 

42 return { 

43 "items": [ 

44 { 

45 "channel": "test", 

46 "formats": { 

47 "http-stream": {"content": "event: message\ndata: hello world\n\n"} 

48 }, 

49 } 

50 ] 

51 } 

52 

53 

54def send_update( 

55 event_id: int, event_type: str, evt_data: dict, channel: Optional[str] = None 

56): 

57 if channel is None: 

58 channel = DEFAULT_FANOUT_CHANNEL 

59 js_string = orjson.dumps(evt_data, default=json_default).decode("utf-8") 

60 content = f"id: evt-{event_id}\nevent: {event_type}\ndata: {js_string}\n\n" 

61 

62 assert conf.CONF.fanout_realm_id is not None 

63 assert conf.CONF.fanout_realm_key is not None 

64 

65 auth = HTTPBasicAuth(conf.CONF.fanout_realm_id, conf.CONF.fanout_realm_key) 

66 fanout_api_url = f"https://api.fanout.io/realm/{conf.CONF.fanout_realm_id}/publish/" 

67 envelope = fanout_envelope() 

68 msg_model = envelope["items"][0] 

69 msg_model["channel"] = channel 

70 msg_model["formats"]["http-stream"]["content"] = content 

71 

72 return requests.post(fanout_api_url, json=envelope, auth=auth, timeout=3) 

73 

74 

75if __name__ == "__main__": 

76 from rfpy.utils import configure_rfpy 

77 

78 js_data = dict(title="Savage Enquiry", id=23432) 

79 configure_rfpy(env_file="~/projects/ssjava/collab/rfpy-settings.env") 

80 res = send_update(22, "ISSUE_RELEASED", js_data, "mychannel") 

81 print(res)