Coverage for fuzzycocopython / utils.py: 94%

240 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-31 09:36 +0000

1import numpy as np 

2import pandas as pd 

3from lfa_toolbox.core.lv.linguistic_variable import LinguisticVariable 

4from lfa_toolbox.core.mf.lin_piece_wise_mf import LeftShoulderMF, RightShoulderMF, TriangularMF 

5from lfa_toolbox.core.mf.singleton_mf import SingletonMF 

6from lfa_toolbox.core.rules.default_fuzzy_rule import DefaultFuzzyRule 

7from lfa_toolbox.core.rules.fuzzy_rule import FuzzyRule 

8from lfa_toolbox.core.rules.fuzzy_rule_element import Antecedent, Consequent 

9 

10from ._fuzzycoco_core import FuzzyCocoParams 

11 

12_METRICS_KEYS = ( 

13 "sensitivity", 

14 "specificity", 

15 "accuracy", 

16 "ppv", 

17 "rmse", 

18 "rrse", 

19 "rae", 

20 "mse", 

21 "distanceThreshold", 

22 "distanceMinThreshold", 

23 "nb_vars", 

24 "overLearn", 

25 "true_positives", 

26 "false_positives", 

27 "true_negatives", 

28 "false_negatives", 

29) 

30 

31 

32def create_input_linguistic_variable(var_name, set_items): 

33 set_items = sorted(set_items, key=lambda s: s.get("position", 0)) 

34 n = len(set_items) 

35 ling_values_dict = {} 

36 for i, s in enumerate(set_items): 

37 pos = s.get("position") 

38 # We ignore the original "name" and use the provided generic label. 

39 set_name = s.get("name") 

40 if n == 1: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true

41 mf = LeftShoulderMF(pos, pos + 1) 

42 elif n == 2: 

43 if i == 0: 

44 mf = LeftShoulderMF(pos, set_items[i + 1]["position"]) 

45 else: 

46 mf = RightShoulderMF(set_items[i - 1]["position"], pos) 

47 else: 

48 if i == 0: 

49 mf = LeftShoulderMF(pos, set_items[i + 1]["position"]) 

50 elif i == n - 1: 

51 mf = RightShoulderMF(set_items[i - 1]["position"], pos) 

52 else: 

53 mf = TriangularMF(set_items[i - 1]["position"], pos, set_items[i + 1]["position"]) 

54 ling_values_dict[set_name] = mf 

55 return LinguisticVariable(var_name, ling_values_dict) 

56 

57 

58def create_output_linguistic_variable(var_name, set_items): 

59 set_items = sorted(set_items, key=lambda s: s.get("position", 0)) 

60 ling_values_dict = {} 

61 for s in set_items: 

62 pos = s.get("position") 

63 set_name = s.get("name") 

64 mf = SingletonMF(pos) 

65 ling_values_dict[set_name] = mf 

66 return LinguisticVariable(var_name, ling_values_dict) 

67 

68 

69def generate_generic_labels(n): 

70 match n: 

71 case 1: 

72 return ["Medium"] 

73 case 2: 

74 return ["Low", "High"] 

75 case 3: 

76 return ["Low", "Medium", "High"] 

77 case 4: 

78 return ["Very Low", "Low", "High", "Very High"] 

79 case 5: 79 ↛ 80line 79 didn't jump to line 80 because the pattern on line 79 never matched

80 return ["Very Low", "Low", "Medium", "High", "Very High"] 

81 case 6: 

82 return [ 

83 "Very Low", 

84 "Low", 

85 "Slightly Low", 

86 "Slightly High", 

87 "High", 

88 "Very High", 

89 ] 

90 case 7: 90 ↛ 91line 90 didn't jump to line 91 because the pattern on line 90 never matched

91 return [ 

92 "Very Low", 

93 "Low", 

94 "Slightly Low", 

95 "Medium", 

96 "Slightly High", 

97 "High", 

98 "Very High", 

99 ] 

100 case 8: 100 ↛ 111line 100 didn't jump to line 111 because the pattern on line 100 always matched

101 return [ 

102 "Very Very Low", 

103 "Very Low", 

104 "Low", 

105 "Slightly Low", 

106 "Slightly High", 

107 "High", 

108 "Very High", 

109 "Very Very High", 

110 ] 

111 case _: 

112 return [f"Set {i + 1}" for i in range(n)] 

113 

114 

115def _auto_bits(n): 

116 if n is None or n < 2: 

117 return 0 

118 return int(np.ceil(np.log2(n))) 

119 

120 

121def build_fuzzycoco_params( 

122 *, 

123 nb_features, 

124 n_outputs, 

125 nb_rules, 

126 nb_max_var_per_rule, 

127 max_generations, 

128 max_fitness, 

129 nb_cooperators, 

130 influence_rules_initial_population, 

131 influence_evolving_ratio, 

132 nb_sets_in, 

133 nb_sets_out, 

134 nb_bits_pos_in, 

135 nb_bits_pos_out, 

136 nb_bits_vars_in=None, 

137 nb_bits_sets_in=None, 

138 nb_bits_vars_out=None, 

139 nb_bits_sets_out=None, 

140 pop_size_rules, 

141 elite_size_rules, 

142 cx_prob_rules, 

143 mut_flip_genome_rules, 

144 mut_flip_bit_rules, 

145 pop_size_mfs, 

146 elite_size_mfs, 

147 cx_prob_mfs, 

148 mut_flip_genome_mfs, 

149 mut_flip_bit_mfs, 

150 threshold, 

151 metrics_weights=None, 

152 features_weights=None, 

153): 

154 """Build a ``FuzzyCocoParams`` instance using explicit estimator parameters.""" 

155 

156 def _bits_vars(value, override): 

157 if override is not None: 

158 return override 

159 if value is None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 return None 

161 return _auto_bits(value) + 1 

162 

163 def _bits_sets(sets, override): 

164 if override is not None: 

165 return override 

166 return _auto_bits(sets) 

167 

168 bits_vars_in = _bits_vars(nb_features, nb_bits_vars_in) 

169 bits_sets_in = _bits_sets(nb_sets_in, nb_bits_sets_in) 

170 bits_vars_out = _bits_vars(n_outputs, nb_bits_vars_out) 

171 bits_sets_out = _bits_sets(nb_sets_out, nb_bits_sets_out) 

172 

173 input_params = { 

174 "nb_sets": nb_sets_in, 

175 "nb_bits_pos": nb_bits_pos_in, 

176 } 

177 if bits_vars_in is not None: 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true

178 input_params["nb_bits_vars"] = bits_vars_in 

179 if bits_sets_in is not None: 179 ↛ 182line 179 didn't jump to line 182 because the condition on line 179 was always true

180 input_params["nb_bits_sets"] = bits_sets_in 

181 

182 output_params = { 

183 "nb_sets": nb_sets_out, 

184 "nb_bits_pos": nb_bits_pos_out, 

185 } 

186 if bits_vars_out is not None: 186 ↛ 188line 186 didn't jump to line 188 because the condition on line 186 was always true

187 output_params["nb_bits_vars"] = bits_vars_out 

188 if bits_sets_out is not None: 188 ↛ 191line 188 didn't jump to line 191 because the condition on line 188 was always true

189 output_params["nb_bits_sets"] = bits_sets_out 

190 

191 rules_params = { 

192 "pop_size": pop_size_rules, 

193 "elite_size": elite_size_rules, 

194 "cx_prob": cx_prob_rules, 

195 "mut_flip_genome": mut_flip_genome_rules, 

196 "mut_flip_bit": mut_flip_bit_rules, 

197 } 

198 

199 mfs_params = { 

200 "pop_size": pop_size_mfs, 

201 "elite_size": elite_size_mfs, 

202 "cx_prob": cx_prob_mfs, 

203 "mut_flip_genome": mut_flip_genome_mfs, 

204 "mut_flip_bit": mut_flip_bit_mfs, 

205 } 

206 

207 fitness_params = { 

208 "output_vars_defuzz_thresholds": [float(threshold)], 

209 } 

210 if metrics_weights is not None: 210 ↛ 225line 210 didn't jump to line 225 because the condition on line 210 was always true

211 unknown_keys = sorted(set(metrics_weights) - set(_METRICS_KEYS)) 

212 if unknown_keys: 

213 raise ValueError( 

214 f"Unknown metrics_weights keys: {', '.join(unknown_keys)}", 

215 ) 

216 normalized = {key: 0.0 for key in _METRICS_KEYS} 

217 for key, value in metrics_weights.items(): 

218 try: 

219 normalized[key] = float(value) 

220 except (TypeError, ValueError) as exc: 

221 raise TypeError( 

222 f"metrics_weights[{key!r}] must be a real number, got {value!r}", 

223 ) from exc 

224 fitness_params["metrics_weights"] = normalized 

225 if features_weights: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true

226 fitness_params["features_weights"] = dict(features_weights) 

227 

228 desc = { 

229 "global_params": { 

230 "nb_rules": nb_rules, 

231 "nb_max_var_per_rule": nb_max_var_per_rule, 

232 "max_generations": max_generations, 

233 "max_fitness": max_fitness, 

234 "nb_cooperators": nb_cooperators, 

235 "influence_rules_initial_population": influence_rules_initial_population, 

236 "influence_evolving_ratio": influence_evolving_ratio, 

237 }, 

238 "input_vars_params": input_params, 

239 "output_vars_params": output_params, 

240 "rules_params": rules_params, 

241 "mfs_params": mfs_params, 

242 "fitness_params": fitness_params, 

243 } 

244 

245 return FuzzyCocoParams.from_dict(desc) 

246 

247 

248def _parse_variables(desc): 

249 fs = desc.get("fuzzy_system", {}) 

250 return fs.get("variables", {}) 

251 

252 

253def _parse_rules(desc): 

254 fs = desc.get("fuzzy_system", {}) 

255 return fs.get("rules", {}) 

256 

257 

258def _parse_default_rules(desc): 

259 fs = desc.get("fuzzy_system", {}) 

260 return fs.get("default_rules", {}) 

261 

262 

263def parse_fuzzy_system_from_description(desc): 

264 """High‑level aggregator used by fit() to expose .variables_, .rules_, etc.""" 

265 return ( 

266 _parse_variables(desc), 

267 _parse_rules(desc), 

268 _parse_default_rules(desc), 

269 ) 

270 

271 

272def to_linguistic_components(variables_dict, rules_dict, default_rules_dict): 

273 """ 

274 Takes variables_/rules_/default_rules_ (parsed from description) 

275 and returns: 

276 linguistic_variables, fuzzy_rules, default_rules 

277 using the user-provided helper factories: 

278 - create_input_linguistic_variable 

279 - create_output_linguistic_variable 

280 """ 

281 # ---- build LinguisticVariable objects ------------------------ 

282 linguistic_variables = [] 

283 lv_by_name = {} 

284 label_map = {} # var -> {orig_set: generic_label} 

285 

286 # input vars 

287 for var_name, sets in variables_dict.get("input", {}).items(): 

288 # sets: {'sepal_length.1': pos, 'sepal_length.2': pos, ...} 

289 set_items = [{"name": k, "position": v} for k, v in sets.items()] 

290 generic = generate_generic_labels(len(set_items)) 

291 for s, lbl in zip(set_items, generic, strict=False): 

292 label_map.setdefault(var_name, {})[s["name"]] = lbl 

293 s["name"] = lbl 

294 lv = create_input_linguistic_variable(var_name, set_items) 

295 linguistic_variables.append(lv) 

296 lv_by_name[var_name] = lv 

297 

298 # output vars 

299 for var_name, sets in variables_dict.get("output", {}).items(): 

300 set_items = [{"name": k, "position": v} for k, v in sets.items()] 

301 generic = generate_generic_labels(len(set_items)) 

302 for s, lbl in zip(set_items, generic, strict=False): 

303 label_map.setdefault(var_name, {})[s["name"]] = lbl 

304 s["name"] = lbl 

305 lv = create_output_linguistic_variable(var_name, set_items) 

306 linguistic_variables.append(lv) 

307 lv_by_name[var_name] = lv 

308 

309 # ---- build FuzzyRule / DefaultFuzzyRule ---------------------- 

310 fixed_act = (np.min, "AND_min") 

311 fixed_imp = (np.min, "AND_min") 

312 fuzzy_rules = [] 

313 default_rules = [] 

314 

315 for _, rule in rules_dict.items(): 

316 ants = [] 

317 for var, mf_dict in rule["antecedents"].items(): 

318 orig_set = next(iter(mf_dict.keys())) 

319 label = label_map[var][orig_set] 

320 ants.append(Antecedent(lv_by_name[var], label, is_not=False)) 

321 

322 cons = [] 

323 for var, mf_dict in rule["consequents"].items(): 

324 orig_set = next(iter(mf_dict.keys())) 

325 label = label_map[var][orig_set] 

326 cons.append(Consequent(lv_by_name[var], label)) 

327 

328 fuzzy_rules.append(FuzzyRule(ants, fixed_act, cons, fixed_imp)) 

329 

330 # default consequents 

331 for var, orig_set in default_rules_dict.items(): 

332 label = label_map[var][orig_set] 

333 cons = [Consequent(lv_by_name[var], label)] 

334 default_rules.append(DefaultFuzzyRule(cons, fixed_imp)) 

335 

336 return linguistic_variables, fuzzy_rules, default_rules 

337 

338 

339# ---- same helpers as before ---------------------------------------------- 

340 

341 

342def _build_pos_and_label_maps(variables_dict): 

343 pos_index = {"input": {}, "output": {}} 

344 label_map = {"input": {}, "output": {}} 

345 for io in ("input", "output"): 

346 for var, sets in variables_dict.get(io, {}).items(): 

347 items = sorted(sets.items(), key=lambda kv: kv[1]) # by position 

348 generic = generate_generic_labels(len(items)) 

349 pos_index[io][var] = {} 

350 label_map[io][var] = {} 

351 for (orig_set, pos), gen in zip(items, generic, strict=False): 

352 pos_index[io][var][gen] = {"position": float(pos), "orig_set": orig_set} 

353 label_map[io][var][orig_set] = gen 

354 return pos_index, label_map 

355 

356 

357def _format_var_lines_for_io(pos_index, io, digits=6): 

358 out = {} 

359 for var, label_info in pos_index.get(io, {}).items(): 

360 label_items = sorted(label_info.items(), key=lambda kv: kv[1]["position"]) 

361 out[var] = [f"{lbl} = {info['position']:.{digits}g} (from {info['orig_set']})" for lbl, info in label_items] 

362 return out 

363 

364 

365def _lookup_label_and_pos(pos_index, label_map, var, orig_set): 

366 io = "input" if var in pos_index["input"] else "output" 

367 lbl = label_map[io][var][orig_set] 

368 pos = pos_index[io][var][lbl]["position"] 

369 return lbl, pos, io 

370 

371 

372def _rule_key(name): 

373 num = "".join(ch for ch in name if ch.isdigit()) 

374 return (int(num) if num else 10**9, name) 

375 

376 

377# ---- VIEWS (human-readable text) ----------------------------------------- 

378 

379 

380def to_views_components(variables_dict, rules_dict, default_rules_dict, *, digits=6): 

381 pos_index, label_map = _build_pos_and_label_maps(variables_dict) 

382 

383 # flat variable map 

384 vars_in = _format_var_lines_for_io(pos_index, "input", digits) 

385 vars_out = _format_var_lines_for_io(pos_index, "output", digits) 

386 variables_view = dict(vars_in) 

387 for var, lines in vars_out.items(): 

388 if var not in variables_view: 388 ↛ 391line 388 didn't jump to line 391 because the condition on line 388 was always true

389 variables_view[var] = lines 

390 else: 

391 variables_view[f"{var} (output)"] = lines # collision guard 

392 

393 def ants_to_text(ant_dict): 

394 parts = [] 

395 for var, mf_dict in ant_dict.items(): 

396 orig_set = next(iter(mf_dict.keys())) 

397 lbl, pos, _ = _lookup_label_and_pos(pos_index, label_map, var, orig_set) 

398 parts.append(f"{var} is {lbl} ({pos:.{digits}g})") 

399 return " AND ".join(parts) if parts else "TRUE" 

400 

401 def cons_to_text(cons_dict): 

402 parts = [] 

403 for var, mf_dict in cons_dict.items(): 

404 orig_set = next(iter(mf_dict.keys())) 

405 lbl, pos, _ = _lookup_label_and_pos(pos_index, label_map, var, orig_set) 

406 parts.append(f"{var} is {lbl} ({pos:.{digits}g})") 

407 return " AND ".join(parts) if parts else "(no consequent)" 

408 

409 rules_view = [] 

410 for rname, rdef in sorted(rules_dict.items(), key=lambda kv: _rule_key(kv[0])): 

411 ants = ants_to_text(rdef.get("antecedents", {})) 

412 cons = cons_to_text(rdef.get("consequents", {})) 

413 rules_view.append(f"{rname.upper()}: IF {ants} THEN {cons}") 

414 

415 default_rules_view = [] 

416 for var, orig_set in default_rules_dict.items(): 

417 lbl, pos, _ = _lookup_label_and_pos(pos_index, label_map, var, orig_set) 

418 default_rules_view.append(f"DEFAULT: {var} is {lbl} ({pos:.{digits}g})") 

419 

420 return variables_view, rules_view, default_rules_view 

421 

422 

423# ---- TABLES (machine-friendly; great in notebooks) ----------------------- 

424 

425 

426def to_tables_components(variables_dict, rules_dict, default_rules_dict): 

427 pos_index, label_map = _build_pos_and_label_maps(variables_dict) 

428 

429 # variables_df 

430 rows_vars = [] 

431 for io in ("input", "output"): 

432 for var, label_info in pos_index[io].items(): 

433 for lbl, info in label_info.items(): 

434 rows_vars.append( 

435 { 

436 "io": io, 

437 "var": var, 

438 "label": lbl, 

439 "position": info["position"], 

440 "orig_set": info["orig_set"], 

441 } 

442 ) 

443 variables_df = pd.DataFrame(rows_vars).sort_values(["io", "var", "position"]).reset_index(drop=True) 

444 

445 # rules_df 

446 rows_rules = [] 

447 for rname, rdef in sorted(rules_dict.items(), key=lambda kv: _rule_key(kv[0])): 

448 for role, part in ( 

449 ("antecedent", rdef.get("antecedents", {})), 

450 ("consequent", rdef.get("consequents", {})), 

451 ): 

452 for var, mf_dict in part.items(): 

453 orig_set = next(iter(mf_dict.keys())) 

454 lbl, pos, io = _lookup_label_and_pos(pos_index, label_map, var, orig_set) 

455 rows_rules.append( 

456 { 

457 "rule": rname, 

458 "role": role, 

459 "io": io, 

460 "var": var, 

461 "label": lbl, 

462 "position": pos, 

463 "orig_set": orig_set, 

464 } 

465 ) 

466 # defaults as rule rows too (role='default') 

467 for var, orig_set in default_rules_dict.items(): 

468 lbl, pos, io = _lookup_label_and_pos(pos_index, label_map, var, orig_set) 

469 rows_rules.append( 

470 { 

471 "rule": "default", 

472 "role": "default", 

473 "io": io, 

474 "var": var, 

475 "label": lbl, 

476 "position": pos, 

477 "orig_set": orig_set, 

478 } 

479 ) 

480 

481 rules_df = pd.DataFrame(rows_rules).sort_values(["rule", "role", "var"]).reset_index(drop=True) 

482 return variables_df, rules_df