Coverage for rfpy/jobs/events/fanout.py: 100%
43 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from typing import Optional
3import orjson
4import webob
5import webob.dec
6import webob.exc
7import requests
8from requests.auth import HTTPBasicAuth
10from rfpy import conf
11from rfpy.utils import json_default
12from rfpy.web.request import HttpRequest
13from rfpy.model.humans import User
14from rfpy.buyer.webapp import BuyerApp
16DEFAULT_FANOUT_CHANNEL = "update-channel"
19@BuyerApp.route("subscribe-events")
20def subscribe(request: HttpRequest) -> webob.Response:
21 """
22 Handle an EventSource subscription request from the Fanout.io api
23 See https://docs.fanout.io/docs/subscribing
24 """
25 user: User = request.user
26 if user is not None:
27 channel = user.org_id
28 else:
29 channel = DEFAULT_FANOUT_CHANNEL
30 if request.method.upper() == "GET":
31 response = webob.Response("", content_type="text/event-stream")
32 response.headers["Cache-Control"] = "no-cache"
33 response.headers["Grip-Hold"] = "stream"
34 response.headers["Grip-Channel"] = channel
35 response.headers["Grip-Keep-Alive"] = ":\\n\\n; format=cstring; timeout=20"
36 return response
38 return webob.exc.HTTPMethodNotAllowed("Only GET requests supported")
41def fanout_envelope():
42 return {
43 "items": [
44 {
45 "channel": "test",
46 "formats": {
47 "http-stream": {"content": "event: message\ndata: hello world\n\n"}
48 },
49 }
50 ]
51 }
54def send_update(
55 event_id: int, event_type: str, evt_data: dict, channel: Optional[str] = None
56):
57 if channel is None:
58 channel = DEFAULT_FANOUT_CHANNEL
59 js_string = orjson.dumps(evt_data, default=json_default).decode("utf-8")
60 content = f"id: evt-{event_id}\nevent: {event_type}\ndata: {js_string}\n\n"
62 assert conf.CONF.fanout_realm_id is not None
63 assert conf.CONF.fanout_realm_key is not None
65 auth = HTTPBasicAuth(conf.CONF.fanout_realm_id, conf.CONF.fanout_realm_key)
66 fanout_api_url = f"https://api.fanout.io/realm/{conf.CONF.fanout_realm_id}/publish/"
67 envelope = fanout_envelope()
68 msg_model = envelope["items"][0]
69 msg_model["channel"] = channel
70 msg_model["formats"]["http-stream"]["content"] = content
72 return requests.post(fanout_api_url, json=envelope, auth=auth, timeout=3)
75if __name__ == "__main__":
76 from rfpy.utils import configure_rfpy
78 js_data = dict(title="Savage Enquiry", id=23432)
79 configure_rfpy(env_file="~/projects/ssjava/collab/rfpy-settings.env")
80 res = send_update(22, "ISSUE_RELEASED", js_data, "mychannel")
81 print(res)