Coverage for rfpy/jobs/events/fanout.py: 100%
40 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1import orjson
2import webob
3import webob.dec
4import webob.exc
5import requests
6from requests.auth import HTTPBasicAuth
8from rfpy import conf
9from rfpy.utils import json_default
10from rfpy.web.request import HttpRequest
11from rfpy.model.humans import User
12from rfpy.buyer.webapp import BuyerApp
14DEFAULT_FANOUT_CHANNEL = 'update-channel'
17@BuyerApp.route('subscribe-events')
18def subscribe(request: HttpRequest) -> webob.Response:
19 '''
20 Handle an EventSource subscription request from the Fanout.io api
21 See https://docs.fanout.io/docs/subscribing
22 '''
23 user: User = request.user
24 if user is not None:
25 channel = user.org_id
26 else:
27 channel = DEFAULT_FANOUT_CHANNEL
28 if request.method.upper() == 'GET':
29 response = webob.Response("", content_type='text/event-stream')
30 response.headers['Cache-Control'] = 'no-cache'
31 response.headers['Grip-Hold'] = 'stream'
32 response.headers['Grip-Channel'] = channel
33 response.headers['Grip-Keep-Alive'] = ':\\n\\n; format=cstring; timeout=20'
34 return response
36 return webob.exc.HTTPMethodNotAllowed('Only GET requests supported')
39def fanout_envelope():
40 return {
41 'items': [
42 {
43 'channel': 'test',
44 'formats': {
45 'http-stream': {
46 'content': 'event: message\ndata: hello world\n\n'
47 }
48 }
49 }
50 ]
51 }
54def send_update(event_id: int, event_type: str, evt_data: dict, channel: str = None):
55 if channel is None:
56 channel = DEFAULT_FANOUT_CHANNEL
57 js_string = orjson.dumps(evt_data, default=json_default)
58 content = f"id: evt-{event_id}\nevent: {event_type}\ndata: {js_string}\n\n"
59 auth = HTTPBasicAuth(conf.CONF.fanout_realm_id, conf.CONF.fanout_realm_key)
60 fanout_api_url = f"https://api.fanout.io/realm/{conf.CONF.fanout_realm_id}/publish/"
61 envelope = fanout_envelope()
62 msg_model = envelope['items'][0]
63 msg_model['channel'] = channel
64 msg_model['formats']['http-stream']['content'] = content
65 return requests.post(fanout_api_url, json=envelope, auth=auth, timeout=3)
68if __name__ == '__main__':
69 from rfpy.utils import configure_rfpy
70 js_data = dict(title='Savage Enquiry', id=23432)
71 configure_rfpy(env_file='~/projects/ssjava/collab/rfpy-settings.env')
72 res = send_update('ISSUE_RELEASED', js_data, 'mychannel')
73 print(res)