Coverage for fuzzycocopython / utils.py: 94%
240 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 09:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 09:36 +0000
1import numpy as np
2import pandas as pd
3from lfa_toolbox.core.lv.linguistic_variable import LinguisticVariable
4from lfa_toolbox.core.mf.lin_piece_wise_mf import LeftShoulderMF, RightShoulderMF, TriangularMF
5from lfa_toolbox.core.mf.singleton_mf import SingletonMF
6from lfa_toolbox.core.rules.default_fuzzy_rule import DefaultFuzzyRule
7from lfa_toolbox.core.rules.fuzzy_rule import FuzzyRule
8from lfa_toolbox.core.rules.fuzzy_rule_element import Antecedent, Consequent
10from ._fuzzycoco_core import FuzzyCocoParams
12_METRICS_KEYS = (
13 "sensitivity",
14 "specificity",
15 "accuracy",
16 "ppv",
17 "rmse",
18 "rrse",
19 "rae",
20 "mse",
21 "distanceThreshold",
22 "distanceMinThreshold",
23 "nb_vars",
24 "overLearn",
25 "true_positives",
26 "false_positives",
27 "true_negatives",
28 "false_negatives",
29)
32def create_input_linguistic_variable(var_name, set_items):
33 set_items = sorted(set_items, key=lambda s: s.get("position", 0))
34 n = len(set_items)
35 ling_values_dict = {}
36 for i, s in enumerate(set_items):
37 pos = s.get("position")
38 # We ignore the original "name" and use the provided generic label.
39 set_name = s.get("name")
40 if n == 1: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 mf = LeftShoulderMF(pos, pos + 1)
42 elif n == 2:
43 if i == 0:
44 mf = LeftShoulderMF(pos, set_items[i + 1]["position"])
45 else:
46 mf = RightShoulderMF(set_items[i - 1]["position"], pos)
47 else:
48 if i == 0:
49 mf = LeftShoulderMF(pos, set_items[i + 1]["position"])
50 elif i == n - 1:
51 mf = RightShoulderMF(set_items[i - 1]["position"], pos)
52 else:
53 mf = TriangularMF(set_items[i - 1]["position"], pos, set_items[i + 1]["position"])
54 ling_values_dict[set_name] = mf
55 return LinguisticVariable(var_name, ling_values_dict)
58def create_output_linguistic_variable(var_name, set_items):
59 set_items = sorted(set_items, key=lambda s: s.get("position", 0))
60 ling_values_dict = {}
61 for s in set_items:
62 pos = s.get("position")
63 set_name = s.get("name")
64 mf = SingletonMF(pos)
65 ling_values_dict[set_name] = mf
66 return LinguisticVariable(var_name, ling_values_dict)
69def generate_generic_labels(n):
70 match n:
71 case 1:
72 return ["Medium"]
73 case 2:
74 return ["Low", "High"]
75 case 3:
76 return ["Low", "Medium", "High"]
77 case 4:
78 return ["Very Low", "Low", "High", "Very High"]
79 case 5: 79 ↛ 80line 79 didn't jump to line 80 because the pattern on line 79 never matched
80 return ["Very Low", "Low", "Medium", "High", "Very High"]
81 case 6:
82 return [
83 "Very Low",
84 "Low",
85 "Slightly Low",
86 "Slightly High",
87 "High",
88 "Very High",
89 ]
90 case 7: 90 ↛ 91line 90 didn't jump to line 91 because the pattern on line 90 never matched
91 return [
92 "Very Low",
93 "Low",
94 "Slightly Low",
95 "Medium",
96 "Slightly High",
97 "High",
98 "Very High",
99 ]
100 case 8: 100 ↛ 111line 100 didn't jump to line 111 because the pattern on line 100 always matched
101 return [
102 "Very Very Low",
103 "Very Low",
104 "Low",
105 "Slightly Low",
106 "Slightly High",
107 "High",
108 "Very High",
109 "Very Very High",
110 ]
111 case _:
112 return [f"Set {i + 1}" for i in range(n)]
115def _auto_bits(n):
116 if n is None or n < 2:
117 return 0
118 return int(np.ceil(np.log2(n)))
121def build_fuzzycoco_params(
122 *,
123 nb_features,
124 n_outputs,
125 nb_rules,
126 nb_max_var_per_rule,
127 max_generations,
128 max_fitness,
129 nb_cooperators,
130 influence_rules_initial_population,
131 influence_evolving_ratio,
132 nb_sets_in,
133 nb_sets_out,
134 nb_bits_pos_in,
135 nb_bits_pos_out,
136 nb_bits_vars_in=None,
137 nb_bits_sets_in=None,
138 nb_bits_vars_out=None,
139 nb_bits_sets_out=None,
140 pop_size_rules,
141 elite_size_rules,
142 cx_prob_rules,
143 mut_flip_genome_rules,
144 mut_flip_bit_rules,
145 pop_size_mfs,
146 elite_size_mfs,
147 cx_prob_mfs,
148 mut_flip_genome_mfs,
149 mut_flip_bit_mfs,
150 threshold,
151 metrics_weights=None,
152 features_weights=None,
153):
154 """Build a ``FuzzyCocoParams`` instance using explicit estimator parameters."""
156 def _bits_vars(value, override):
157 if override is not None:
158 return override
159 if value is None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 return None
161 return _auto_bits(value) + 1
163 def _bits_sets(sets, override):
164 if override is not None:
165 return override
166 return _auto_bits(sets)
168 bits_vars_in = _bits_vars(nb_features, nb_bits_vars_in)
169 bits_sets_in = _bits_sets(nb_sets_in, nb_bits_sets_in)
170 bits_vars_out = _bits_vars(n_outputs, nb_bits_vars_out)
171 bits_sets_out = _bits_sets(nb_sets_out, nb_bits_sets_out)
173 input_params = {
174 "nb_sets": nb_sets_in,
175 "nb_bits_pos": nb_bits_pos_in,
176 }
177 if bits_vars_in is not None: 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true
178 input_params["nb_bits_vars"] = bits_vars_in
179 if bits_sets_in is not None: 179 ↛ 182line 179 didn't jump to line 182 because the condition on line 179 was always true
180 input_params["nb_bits_sets"] = bits_sets_in
182 output_params = {
183 "nb_sets": nb_sets_out,
184 "nb_bits_pos": nb_bits_pos_out,
185 }
186 if bits_vars_out is not None: 186 ↛ 188line 186 didn't jump to line 188 because the condition on line 186 was always true
187 output_params["nb_bits_vars"] = bits_vars_out
188 if bits_sets_out is not None: 188 ↛ 191line 188 didn't jump to line 191 because the condition on line 188 was always true
189 output_params["nb_bits_sets"] = bits_sets_out
191 rules_params = {
192 "pop_size": pop_size_rules,
193 "elite_size": elite_size_rules,
194 "cx_prob": cx_prob_rules,
195 "mut_flip_genome": mut_flip_genome_rules,
196 "mut_flip_bit": mut_flip_bit_rules,
197 }
199 mfs_params = {
200 "pop_size": pop_size_mfs,
201 "elite_size": elite_size_mfs,
202 "cx_prob": cx_prob_mfs,
203 "mut_flip_genome": mut_flip_genome_mfs,
204 "mut_flip_bit": mut_flip_bit_mfs,
205 }
207 fitness_params = {
208 "output_vars_defuzz_thresholds": [float(threshold)],
209 }
210 if metrics_weights is not None: 210 ↛ 225line 210 didn't jump to line 225 because the condition on line 210 was always true
211 unknown_keys = sorted(set(metrics_weights) - set(_METRICS_KEYS))
212 if unknown_keys:
213 raise ValueError(
214 f"Unknown metrics_weights keys: {', '.join(unknown_keys)}",
215 )
216 normalized = {key: 0.0 for key in _METRICS_KEYS}
217 for key, value in metrics_weights.items():
218 try:
219 normalized[key] = float(value)
220 except (TypeError, ValueError) as exc:
221 raise TypeError(
222 f"metrics_weights[{key!r}] must be a real number, got {value!r}",
223 ) from exc
224 fitness_params["metrics_weights"] = normalized
225 if features_weights: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true
226 fitness_params["features_weights"] = dict(features_weights)
228 desc = {
229 "global_params": {
230 "nb_rules": nb_rules,
231 "nb_max_var_per_rule": nb_max_var_per_rule,
232 "max_generations": max_generations,
233 "max_fitness": max_fitness,
234 "nb_cooperators": nb_cooperators,
235 "influence_rules_initial_population": influence_rules_initial_population,
236 "influence_evolving_ratio": influence_evolving_ratio,
237 },
238 "input_vars_params": input_params,
239 "output_vars_params": output_params,
240 "rules_params": rules_params,
241 "mfs_params": mfs_params,
242 "fitness_params": fitness_params,
243 }
245 return FuzzyCocoParams.from_dict(desc)
248def _parse_variables(desc):
249 fs = desc.get("fuzzy_system", {})
250 return fs.get("variables", {})
253def _parse_rules(desc):
254 fs = desc.get("fuzzy_system", {})
255 return fs.get("rules", {})
258def _parse_default_rules(desc):
259 fs = desc.get("fuzzy_system", {})
260 return fs.get("default_rules", {})
263def parse_fuzzy_system_from_description(desc):
264 """High‑level aggregator used by fit() to expose .variables_, .rules_, etc."""
265 return (
266 _parse_variables(desc),
267 _parse_rules(desc),
268 _parse_default_rules(desc),
269 )
272def to_linguistic_components(variables_dict, rules_dict, default_rules_dict):
273 """
274 Takes variables_/rules_/default_rules_ (parsed from description)
275 and returns:
276 linguistic_variables, fuzzy_rules, default_rules
277 using the user-provided helper factories:
278 - create_input_linguistic_variable
279 - create_output_linguistic_variable
280 """
281 # ---- build LinguisticVariable objects ------------------------
282 linguistic_variables = []
283 lv_by_name = {}
284 label_map = {} # var -> {orig_set: generic_label}
286 # input vars
287 for var_name, sets in variables_dict.get("input", {}).items():
288 # sets: {'sepal_length.1': pos, 'sepal_length.2': pos, ...}
289 set_items = [{"name": k, "position": v} for k, v in sets.items()]
290 generic = generate_generic_labels(len(set_items))
291 for s, lbl in zip(set_items, generic, strict=False):
292 label_map.setdefault(var_name, {})[s["name"]] = lbl
293 s["name"] = lbl
294 lv = create_input_linguistic_variable(var_name, set_items)
295 linguistic_variables.append(lv)
296 lv_by_name[var_name] = lv
298 # output vars
299 for var_name, sets in variables_dict.get("output", {}).items():
300 set_items = [{"name": k, "position": v} for k, v in sets.items()]
301 generic = generate_generic_labels(len(set_items))
302 for s, lbl in zip(set_items, generic, strict=False):
303 label_map.setdefault(var_name, {})[s["name"]] = lbl
304 s["name"] = lbl
305 lv = create_output_linguistic_variable(var_name, set_items)
306 linguistic_variables.append(lv)
307 lv_by_name[var_name] = lv
309 # ---- build FuzzyRule / DefaultFuzzyRule ----------------------
310 fixed_act = (np.min, "AND_min")
311 fixed_imp = (np.min, "AND_min")
312 fuzzy_rules = []
313 default_rules = []
315 for _, rule in rules_dict.items():
316 ants = []
317 for var, mf_dict in rule["antecedents"].items():
318 orig_set = next(iter(mf_dict.keys()))
319 label = label_map[var][orig_set]
320 ants.append(Antecedent(lv_by_name[var], label, is_not=False))
322 cons = []
323 for var, mf_dict in rule["consequents"].items():
324 orig_set = next(iter(mf_dict.keys()))
325 label = label_map[var][orig_set]
326 cons.append(Consequent(lv_by_name[var], label))
328 fuzzy_rules.append(FuzzyRule(ants, fixed_act, cons, fixed_imp))
330 # default consequents
331 for var, orig_set in default_rules_dict.items():
332 label = label_map[var][orig_set]
333 cons = [Consequent(lv_by_name[var], label)]
334 default_rules.append(DefaultFuzzyRule(cons, fixed_imp))
336 return linguistic_variables, fuzzy_rules, default_rules
339# ---- same helpers as before ----------------------------------------------
342def _build_pos_and_label_maps(variables_dict):
343 pos_index = {"input": {}, "output": {}}
344 label_map = {"input": {}, "output": {}}
345 for io in ("input", "output"):
346 for var, sets in variables_dict.get(io, {}).items():
347 items = sorted(sets.items(), key=lambda kv: kv[1]) # by position
348 generic = generate_generic_labels(len(items))
349 pos_index[io][var] = {}
350 label_map[io][var] = {}
351 for (orig_set, pos), gen in zip(items, generic, strict=False):
352 pos_index[io][var][gen] = {"position": float(pos), "orig_set": orig_set}
353 label_map[io][var][orig_set] = gen
354 return pos_index, label_map
357def _format_var_lines_for_io(pos_index, io, digits=6):
358 out = {}
359 for var, label_info in pos_index.get(io, {}).items():
360 label_items = sorted(label_info.items(), key=lambda kv: kv[1]["position"])
361 out[var] = [f"{lbl} = {info['position']:.{digits}g} (from {info['orig_set']})" for lbl, info in label_items]
362 return out
365def _lookup_label_and_pos(pos_index, label_map, var, orig_set):
366 io = "input" if var in pos_index["input"] else "output"
367 lbl = label_map[io][var][orig_set]
368 pos = pos_index[io][var][lbl]["position"]
369 return lbl, pos, io
372def _rule_key(name):
373 num = "".join(ch for ch in name if ch.isdigit())
374 return (int(num) if num else 10**9, name)
377# ---- VIEWS (human-readable text) -----------------------------------------
380def to_views_components(variables_dict, rules_dict, default_rules_dict, *, digits=6):
381 pos_index, label_map = _build_pos_and_label_maps(variables_dict)
383 # flat variable map
384 vars_in = _format_var_lines_for_io(pos_index, "input", digits)
385 vars_out = _format_var_lines_for_io(pos_index, "output", digits)
386 variables_view = dict(vars_in)
387 for var, lines in vars_out.items():
388 if var not in variables_view: 388 ↛ 391line 388 didn't jump to line 391 because the condition on line 388 was always true
389 variables_view[var] = lines
390 else:
391 variables_view[f"{var} (output)"] = lines # collision guard
393 def ants_to_text(ant_dict):
394 parts = []
395 for var, mf_dict in ant_dict.items():
396 orig_set = next(iter(mf_dict.keys()))
397 lbl, pos, _ = _lookup_label_and_pos(pos_index, label_map, var, orig_set)
398 parts.append(f"{var} is {lbl} ({pos:.{digits}g})")
399 return " AND ".join(parts) if parts else "TRUE"
401 def cons_to_text(cons_dict):
402 parts = []
403 for var, mf_dict in cons_dict.items():
404 orig_set = next(iter(mf_dict.keys()))
405 lbl, pos, _ = _lookup_label_and_pos(pos_index, label_map, var, orig_set)
406 parts.append(f"{var} is {lbl} ({pos:.{digits}g})")
407 return " AND ".join(parts) if parts else "(no consequent)"
409 rules_view = []
410 for rname, rdef in sorted(rules_dict.items(), key=lambda kv: _rule_key(kv[0])):
411 ants = ants_to_text(rdef.get("antecedents", {}))
412 cons = cons_to_text(rdef.get("consequents", {}))
413 rules_view.append(f"{rname.upper()}: IF {ants} THEN {cons}")
415 default_rules_view = []
416 for var, orig_set in default_rules_dict.items():
417 lbl, pos, _ = _lookup_label_and_pos(pos_index, label_map, var, orig_set)
418 default_rules_view.append(f"DEFAULT: {var} is {lbl} ({pos:.{digits}g})")
420 return variables_view, rules_view, default_rules_view
423# ---- TABLES (machine-friendly; great in notebooks) -----------------------
426def to_tables_components(variables_dict, rules_dict, default_rules_dict):
427 pos_index, label_map = _build_pos_and_label_maps(variables_dict)
429 # variables_df
430 rows_vars = []
431 for io in ("input", "output"):
432 for var, label_info in pos_index[io].items():
433 for lbl, info in label_info.items():
434 rows_vars.append(
435 {
436 "io": io,
437 "var": var,
438 "label": lbl,
439 "position": info["position"],
440 "orig_set": info["orig_set"],
441 }
442 )
443 variables_df = pd.DataFrame(rows_vars).sort_values(["io", "var", "position"]).reset_index(drop=True)
445 # rules_df
446 rows_rules = []
447 for rname, rdef in sorted(rules_dict.items(), key=lambda kv: _rule_key(kv[0])):
448 for role, part in (
449 ("antecedent", rdef.get("antecedents", {})),
450 ("consequent", rdef.get("consequents", {})),
451 ):
452 for var, mf_dict in part.items():
453 orig_set = next(iter(mf_dict.keys()))
454 lbl, pos, io = _lookup_label_and_pos(pos_index, label_map, var, orig_set)
455 rows_rules.append(
456 {
457 "rule": rname,
458 "role": role,
459 "io": io,
460 "var": var,
461 "label": lbl,
462 "position": pos,
463 "orig_set": orig_set,
464 }
465 )
466 # defaults as rule rows too (role='default')
467 for var, orig_set in default_rules_dict.items():
468 lbl, pos, io = _lookup_label_and_pos(pos_index, label_map, var, orig_set)
469 rows_rules.append(
470 {
471 "rule": "default",
472 "role": "default",
473 "io": io,
474 "var": var,
475 "label": lbl,
476 "position": pos,
477 "orig_set": orig_set,
478 }
479 )
481 rules_df = pd.DataFrame(rows_rules).sort_values(["rule", "role", "var"]).reset_index(drop=True)
482 return variables_df, rules_df