Coverage for rfpy/jobs/events/fanout.py: 100%

40 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1import orjson 

2import webob 

3import webob.dec 

4import webob.exc 

5import requests 

6from requests.auth import HTTPBasicAuth 

7 

8from rfpy import conf 

9from rfpy.utils import json_default 

10from rfpy.web.request import HttpRequest 

11from rfpy.model.humans import User 

12from rfpy.buyer.webapp import BuyerApp 

13 

14DEFAULT_FANOUT_CHANNEL = 'update-channel' 

15 

16 

17@BuyerApp.route('subscribe-events') 

18def subscribe(request: HttpRequest) -> webob.Response: 

19 ''' 

20 Handle an EventSource subscription request from the Fanout.io api 

21 See https://docs.fanout.io/docs/subscribing 

22 ''' 

23 user: User = request.user 

24 if user is not None: 

25 channel = user.org_id 

26 else: 

27 channel = DEFAULT_FANOUT_CHANNEL 

28 if request.method.upper() == 'GET': 

29 response = webob.Response("", content_type='text/event-stream') 

30 response.headers['Cache-Control'] = 'no-cache' 

31 response.headers['Grip-Hold'] = 'stream' 

32 response.headers['Grip-Channel'] = channel 

33 response.headers['Grip-Keep-Alive'] = ':\\n\\n; format=cstring; timeout=20' 

34 return response 

35 

36 return webob.exc.HTTPMethodNotAllowed('Only GET requests supported') 

37 

38 

39def fanout_envelope(): 

40 return { 

41 'items': [ 

42 { 

43 'channel': 'test', 

44 'formats': { 

45 'http-stream': { 

46 'content': 'event: message\ndata: hello world\n\n' 

47 } 

48 } 

49 } 

50 ] 

51 } 

52 

53 

54def send_update(event_id: int, event_type: str, evt_data: dict, channel: str = None): 

55 if channel is None: 

56 channel = DEFAULT_FANOUT_CHANNEL 

57 js_string = orjson.dumps(evt_data, default=json_default) 

58 content = f"id: evt-{event_id}\nevent: {event_type}\ndata: {js_string}\n\n" 

59 auth = HTTPBasicAuth(conf.CONF.fanout_realm_id, conf.CONF.fanout_realm_key) 

60 fanout_api_url = f"https://api.fanout.io/realm/{conf.CONF.fanout_realm_id}/publish/" 

61 envelope = fanout_envelope() 

62 msg_model = envelope['items'][0] 

63 msg_model['channel'] = channel 

64 msg_model['formats']['http-stream']['content'] = content 

65 return requests.post(fanout_api_url, json=envelope, auth=auth, timeout=3) 

66 

67 

68if __name__ == '__main__': 

69 from rfpy.utils import configure_rfpy 

70 js_data = dict(title='Savage Enquiry', id=23432) 

71 configure_rfpy(env_file='~/projects/ssjava/collab/rfpy-settings.env') 

72 res = send_update('ISSUE_RELEASED', js_data, 'mychannel') 

73 print(res)