Coverage for rfpy/utils.py: 97%

75 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import io 

2import time 

3import pstats 

4import logging 

5import logging.config 

6import cProfile 

7import contextlib 

8from enum import Enum 

9from decimal import Decimal 

10 

11from sqlalchemy import text 

12from sqlalchemy.engine import create_engine 

13from pydantic import AnyHttpUrl 

14from cryptography.fernet import Fernet 

15 

16from rfpy.model.audit import visible 

17from rfpy.conf.settings import AppSettings, RunMode 

18from rfpy import conf 

19 

20 

21bench_log = logging.getLogger("rfpy.benchmark") 

22config_log = logging.getLogger("rfpy.utils.config") 

23log = logging.getLogger(__name__) 

24 

25 

26def configure_rfpy(env_file=None) -> "AppSettings": 

27 if env_file: 

28 # This is valid for Pydantic 2. MyPy error is presumably a bug 

29 conf.CONF = AppSettings(_env_file=env_file) # type: ignore 

30 else: 

31 conf.CONF = AppSettings() 

32 

33 configure_logging() 

34 log_setup_details(env_file) 

35 visible.check_all_events_assigned() 

36 return conf.CONF 

37 

38 

39def build_engine(echo=False): 

40 if conf.CONF is None: 

41 raise Exception("--!!--rfpy not configured. Call utils.configure_rfpy()--") 

42 

43 dsn = conf.CONF.sqlalchemy_dsn 

44 engine = create_engine(dsn, echo=echo, pool_recycle=3600) 

45 try: 

46 conn = engine.connect() 

47 conn.execute(text("select count(id) from projects")) 

48 conn.close() 

49 except Exception as e: 

50 raise Exception( 

51 f"\n\nUnable to connect to database with {dsn},\n\nError: {e}\n\n" 

52 ) 

53 log.info("Created mysql engine with %s" % dsn) 

54 return engine 

55 

56 

57def log_setup_details(env_file): 

58 config_log.info("\n rfpy configured: ") 

59 if env_file is not None: 

60 config_log.info("-- using settings env file: %s", env_file) 

61 config_log.info("-- mode: %s", conf.CONF.run_mode) 

62 config_log.info("-- db_url: %s", conf.CONF.conn_string()) 

63 config_log.info("-- mailer: %s" % conf.CONF.mailer) 

64 config_log.info("-- email_to_override: %s" % conf.CONF.email_to_override) 

65 if conf.CONF.postmark_api_key: 

66 config_log.info("-- postmark-api-key set") 

67 if not conf.CONF.fanout_realm_id: 

68 config_log.info( 

69 "-- Fanout configured for realm ID %s", conf.CONF.fanout_realm_id 

70 ) 

71 else: 

72 config_log.info("-- Fanout disabled (no fanout-realm-id set") 

73 

74 

75class benchmark(object): # pragma: no cover 

76 def __init__(self, name, sensitivity=1): 

77 self.name = name 

78 self.sensitivity = sensitivity 

79 

80 def __enter__(self): 

81 self.start = time.time() 

82 

83 def __exit__(self, ty, val, tb): 

84 elapsed = time.time() - self.start 

85 if elapsed < (0.1 / self.sensitivity): 

86 level = logging.DEBUG 

87 elif elapsed < (0.5 / self.sensitivity): 

88 level = logging.INFO 

89 elif elapsed < (2 / self.sensitivity): 

90 level = logging.WARN 

91 else: 

92 level = logging.ERROR 

93 bench_log.log(level, "%s : %0.3f seconds", self.name, elapsed) 

94 return False 

95 

96 

97@contextlib.contextmanager # pragma: no cover 

98def profiled(): 

99 pr = cProfile.Profile() 

100 pr.enable() 

101 yield pr 

102 pr.disable() 

103 s = io.StringIO() 

104 ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") 

105 # ps.print_stats(1000, 'rfpy') 

106 ps.print_stats() 

107 # uncomment this to see who's calling what 

108 # ps.print_callers() 

109 print(s.getvalue()) 

110 

111 

112def configure_logging(log_config_dict=None): # pragma: no cover 

113 if log_config_dict is not None: 

114 logging.config.dictConfig(log_config_dict) 

115 return 

116 from rfpy.conf import logconf 

117 

118 if conf.CONF.run_mode == RunMode.production: 

119 email = "[email protected]" 

120 logconf.PRODUCTION["handlers"]["email"]["fromaddr"] = email 

121 subject = "RFPY Error on %s" % conf.CONF.webapp_hostname 

122 logconf.PRODUCTION["handlers"]["email"]["subject"] = subject 

123 logging.config.dictConfig(logconf.PRODUCTION) 

124 elif conf.CONF.run_mode == RunMode.development: 

125 logging.config.dictConfig(logconf.DEVELOPMENT) 

126 elif conf.CONF.run_mode == RunMode.test: 

127 logging.config.dictConfigClass(logconf.TEST) 

128 else: 

129 raise ValueError("Cannot configure logging without valid RunMode") 

130 

131 

132def json_default(obj): 

133 if isinstance(obj, set): 

134 return list(obj) 

135 elif isinstance(obj, Decimal): 

136 return float(obj) 

137 elif hasattr(obj, "as_dict"): 

138 return obj.as_dict() 

139 elif isinstance(obj, Enum): 

140 return obj.value 

141 elif isinstance(obj, AnyHttpUrl): 

142 return str(obj) 

143 else: 

144 raise TypeError("Cannot serialise %s to json." % type(obj)) 

145 

146 

147def encrypt_value(unicode_val): 

148 fern = Fernet(conf.CONF.crypt_key) 

149 bytes_val = unicode_val.encode("utf8") 

150 return fern.encrypt(bytes_val).decode("utf8") 

151 

152 

153def decrypt_value(byte_string, max_age=None): 

154 fern = Fernet(conf.CONF.crypt_key) 

155 if max_age is None: 

156 bytes_unencrypted = fern.decrypt(byte_string) 

157 else: 

158 bytes_unencrypted = fern.decrypt(byte_string, ttl=max_age) 

159 return bytes_unencrypted.decode("utf8") 

160 

161 

162if __name__ == "__main__": # pragma: no cover 

163 configure_rfpy()