Coverage for fuzzycocopython / fuzzycoco_base.py: 77%

657 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-31 09:36 +0000

1from __future__ import annotations 

2 

3import copy 

4import os 

5from collections.abc import Callable, Mapping, Sequence 

6from dataclasses import dataclass 

7 

8import joblib 

9import numpy as np 

10import pandas as pd 

11from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin 

12from sklearn.metrics import get_scorer 

13from sklearn.utils import check_random_state 

14from sklearn.utils.validation import check_array, check_is_fitted, check_X_y 

15 

16from ._fuzzycoco_core import DataFrame, FuzzyCoco, FuzzyCocoParams, FuzzySystem, RandomGenerator 

17from .fuzzycoco_plot_mixin import FuzzyCocoPlotMixin 

18from .utils import ( 

19 build_fuzzycoco_params, 

20 generate_generic_labels, 

21 parse_fuzzy_system_from_description, 

22 to_linguistic_components, 

23 to_tables_components, 

24 to_views_components, 

25) 

26 

27_MISSING_DATA_DOUBLE = np.finfo(np.float64).min 

28 

29 

30@dataclass(frozen=True) 

31class FitStepInfo: 

32 """Snapshot of one training generation.""" 

33 

34 generation: int 

35 fitness: float 

36 history: Sequence[float] 

37 model: FuzzyCoco 

38 estimator: _FuzzyCocoBase 

39 

40 

41class RuleActivations(np.ndarray): 

42 """NumPy array of rule fire levels carrying default rule activations as metadata.""" 

43 

44 def __new__(cls, activations, default_rules=None): 

45 obj = np.asarray(activations, dtype=float).view(cls) 

46 obj.default_rules = None if not default_rules else dict(default_rules) 

47 return obj 

48 

49 def __array_finalize__(self, parent): 

50 if parent is None: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 return 

52 self.default_rules = getattr(parent, "default_rules", None) 

53 

54 

55class RuleActivationMatrix(np.ndarray): 

56 """Activation matrix storing per-sample default rule metadata.""" 

57 

58 def __new__(cls, matrix, default_rules=None): 

59 obj = np.asarray(matrix, dtype=float).view(cls) 

60 obj.default_rules = None if default_rules is None else tuple(default_rules) 

61 return obj 

62 

63 def __array_finalize__(self, parent): 

64 if parent is None: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 return 

66 self.default_rules = getattr(parent, "default_rules", None) 

67 

68 

69def save_model(model, filepath, *, compress=3): 

70 """Save a fitted estimator to disk with joblib. 

71 

72 Parameters 

73 - model: the fitted estimator instance (classifier or regressor) 

74 - filepath: target path (str or Path-like) 

75 - compress: joblib compression level or bool 

76 

77 Returns the path string. 

78 """ 

79 

80 path = os.fspath(filepath) 

81 joblib.dump(model, path, compress=compress) 

82 return path 

83 

84 

85def load_model(filepath): 

86 """Load a previously saved estimator created with save_model.""" 

87 

88 return joblib.load(os.fspath(filepath)) 

89 

90 

91# ──────────────────────────────────────────────────────────────────────────────── 

92# Base wrapper 

93# ──────────────────────────────────────────────────────────────────────────────── 

94class _FuzzyCocoBase(BaseEstimator): 

95 """Shared logic for FuzzyCocoClassifier and FuzzyCocoRegressor. 

96 

97 Provides scikit-learn compatible ``fit``/``predict``/``score`` plus 

98 utilities to inspect fuzzy rules and variables produced by the 

99 underlying C++ engine. 

100 """ 

101 

102 _default_metrics_weights: dict[str, float] | None = None 

103 

104 def __init__( 

105 self, 

106 nb_rules=5, 

107 nb_max_var_per_rule=3, 

108 max_generations=100, 

109 max_fitness=1.0, 

110 nb_cooperators=2, 

111 influence_rules_initial_population=False, 

112 influence_evolving_ratio=0.8, 

113 nb_sets_in=2, 

114 nb_sets_out=2, 

115 pop_size_rules=200, 

116 pop_size_mfs=200, 

117 elite_size_rules=5, 

118 elite_size_mfs=5, 

119 cx_prob_rules=0.6, 

120 cx_prob_mfs=0.9, 

121 mut_flip_genome_rules=0.4, 

122 mut_flip_genome_mfs=0.2, 

123 mut_flip_bit_rules=0.01, 

124 mut_flip_bit_mfs=0.01, 

125 nb_bits_pos_in=8, 

126 nb_bits_pos_out=8, 

127 nb_bits_vars_in=None, 

128 nb_bits_vars_out=None, 

129 nb_bits_sets_in=None, 

130 nb_bits_sets_out=None, 

131 threshold=0.5, 

132 metrics_weights=None, 

133 features_weights=None, 

134 random_state=None, 

135 ): 

136 """Initialize a FuzzyCoco estimator with explicit hyper-parameters. 

137 

138 Parameters 

139 ---------- 

140 nb_rules : int, default=5 

141 Number of fuzzy rules evolved during optimisation. 

142 nb_max_var_per_rule : int, default=3 

143 Maximum number of antecedents allowed in a rule. 

144 max_generations : int, default=100 

145 Evolution generations for both rule and membership function search. 

146 max_fitness : float, default=1.0 

147 Target fitness score that can trigger early stopping. 

148 nb_cooperators : int, default=2 

149 Number of cooperating agents in the fuzzy optimisation engine. 

150 influence_rules_initial_population : bool, default=False 

151 Whether to seed the population with rule influence heuristics. 

152 influence_evolving_ratio : float, default=0.8 

153 Ratio controlling how strongly influence is applied during evolution. 

154 nb_sets_in : int, default=2 

155 Number of linguistic sets per input variable. 

156 nb_sets_out : int, default=2 

157 Number of linguistic sets per output variable. 

158 pop_size_rules : int, default=200 

159 Population size for the rule genome evolution. 

160 pop_size_mfs : int, default=200 

161 Population size for the membership-function genome evolution. 

162 elite_size_rules : int, default=5 

163 Number of elite individuals kept each generation in the rule evolution. 

164 elite_size_mfs : int, default=5 

165 Number of elite individuals kept each generation in the membership evolution. 

166 cx_prob_rules : float, default=0.6 

167 Crossover probability for rule evolution. 

168 cx_prob_mfs : float, default=0.9 

169 Crossover probability for membership-function evolution. 

170 mut_flip_genome_rules : float, default=0.4 

171 Genome-level mutation probability for rules. 

172 mut_flip_genome_mfs : float, default=0.2 

173 Genome-level mutation probability for membership functions. 

174 mut_flip_bit_rules : float, default=0.01 

175 Bit-flip mutation probability for rules. 

176 mut_flip_bit_mfs : float, default=0.01 

177 Bit-flip mutation probability for membership functions. 

178 nb_bits_pos_in : int, default=8 

179 Bit width used to encode the positions of input membership functions. 

180 nb_bits_pos_out : int, default=8 

181 Bit width used to encode the positions of output membership functions. 

182 nb_bits_vars_in : int | None, optional 

183 Override for the automatically computed input variable bit width. 

184 nb_bits_vars_out : int | None, optional 

185 Override for the automatically computed output variable bit width. 

186 nb_bits_sets_in : int | None, optional 

187 Override for the automatically computed input set bit width. 

188 nb_bits_sets_out : int | None, optional 

189 Override for the automatically computed output set bit width. 

190 threshold : float, default=0.5 

191 Default singleton defuzzification threshold applied to each output. 

192 metrics_weights : dict[str, float] | None, optional 

193 Mapping of fitness metric names to weights. A sensible default is 

194 provided by the classifier/regressor subclasses when omitted. 

195 All unspecified metrics are set to ``0.0``; unknown keys raise 

196 ``ValueError``. 

197 

198 Valid keys: 

199 

200 - ``"sensitivity"`` — true positive rate: TP / (TP + FN) 

201 - ``"specificity"`` — true negative rate: TN / (TN + FP) 

202 - ``"accuracy"`` — (TP + TN) / (TP + TN + FP + FN) 

203 - ``"ppv"`` — precision: TP / (TP + FP) 

204 - ``"rmse"`` — root mean square error 

205 - ``"rrse"`` — root relative squared error 

206 - ``"rae"`` — relative absolute error 

207 - ``"mse"`` — mean squared error 

208 - ``"distanceThreshold"`` — normalised aggregate distance to the 

209 defuzzification threshold for correctly classified samples 

210 - ``"distanceMinThreshold"`` — average minimum per-sample distance 

211 to the threshold for correctly classified samples (confidence proxy) 

212 - ``"nb_vars"`` — complexity penalty: 1 / nb_vars where 

213 *nb_vars* is the total number of input variables used across all 

214 rules; use to discourage overly complex systems 

215 - ``"overLearn"`` — reserved, always 0 in the current engine; 

216 setting this weight has no effect 

217 - ``"true_positives"`` — raw TP count (not normalised) 

218 - ``"false_positives"`` — raw FP count (not normalised) 

219 - ``"true_negatives"`` — raw TN count (not normalised) 

220 - ``"false_negatives"`` — raw FN count (not normalised) 

221 features_weights : dict[str, float] | None, optional 

222 Optional per-feature weights used by the underlying fitness function. 

223 random_state : int | RandomState | None, optional 

224 Seed or NumPy-compatible random state for reproducibility. 

225 """ 

226 

227 self.nb_rules = nb_rules 

228 self.nb_max_var_per_rule = nb_max_var_per_rule 

229 self.max_generations = max_generations 

230 self.max_fitness = max_fitness 

231 self.nb_cooperators = nb_cooperators 

232 self.influence_rules_initial_population = influence_rules_initial_population 

233 self.influence_evolving_ratio = influence_evolving_ratio 

234 self.nb_sets_in = nb_sets_in 

235 self.nb_sets_out = nb_sets_out 

236 self.pop_size_rules = pop_size_rules 

237 self.pop_size_mfs = pop_size_mfs 

238 self.elite_size_rules = elite_size_rules 

239 self.elite_size_mfs = elite_size_mfs 

240 self.cx_prob_rules = cx_prob_rules 

241 self.cx_prob_mfs = cx_prob_mfs 

242 self.mut_flip_genome_rules = mut_flip_genome_rules 

243 self.mut_flip_genome_mfs = mut_flip_genome_mfs 

244 self.mut_flip_bit_rules = mut_flip_bit_rules 

245 self.mut_flip_bit_mfs = mut_flip_bit_mfs 

246 self.nb_bits_pos_in = nb_bits_pos_in 

247 self.nb_bits_pos_out = nb_bits_pos_out 

248 self.nb_bits_vars_in = nb_bits_vars_in 

249 self.nb_bits_vars_out = nb_bits_vars_out 

250 self.nb_bits_sets_in = nb_bits_sets_in 

251 self.nb_bits_sets_out = nb_bits_sets_out 

252 self.threshold = threshold 

253 if metrics_weights is None: 

254 default_metrics = self._default_metrics_weights 

255 metrics = None if default_metrics is None else dict(default_metrics) 

256 else: 

257 metrics = metrics_weights 

258 self.metrics_weights = metrics 

259 

260 self.features_weights = None if features_weights is None else features_weights 

261 self.random_state = random_state 

262 

263 # ────────────────────────────────────────────────────────────────────── 

264 # internal helpers 

265 # ────────────────────────────────────────────────────────────────────── 

266 def _resolve_seed(self): 

267 """Return a deterministic 32-bit seed derived from sklearn RNG.""" 

268 rng = check_random_state(self.random_state) 

269 return int(rng.randint(0, 2**32 - 1, dtype=np.uint32)) 

270 

271 def _extract_output_names(self): 

272 """Return output variable names from the stored description.""" 

273 fuzzy_desc = getattr(self, "description_", None) 

274 if not fuzzy_desc: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 return [] 

276 variables = fuzzy_desc.get("fuzzy_system", {}).get("variables", {}) 

277 outputs = variables.get("output", {}) 

278 return list(outputs.keys()) 

279 

280 @staticmethod 

281 def _rename_membership_label(label, old_var, new_var): 

282 """Rename membership labels carrying the old variable prefix.""" 

283 if not isinstance(label, str): 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 return label 

285 if label == old_var: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 return new_var 

287 for sep in (".", "_", "-", " "): 287 ↛ 291line 287 didn't jump to line 291 because the loop on line 287 didn't complete

288 prefix = f"{old_var}{sep}" 

289 if label.startswith(prefix): 289 ↛ 287line 289 didn't jump to line 287 because the condition on line 289 was always true

290 return f"{new_var}{sep}{label[len(prefix):]}" 

291 return label 

292 

293 def _rebuild_from_description(self): 

294 """Refresh cached Python helpers and fuzzy system from description.""" 

295 parsed = parse_fuzzy_system_from_description(self.description_) 

296 self.variables_, self.rules_, self.default_rules_ = to_linguistic_components(*parsed) 

297 self.variables_view_, self.rules_view_, self.default_rules_view_ = to_views_components(*parsed) 

298 self.variables_df_, self.rules_df_ = to_tables_components(*parsed) 

299 

300 output_names = self._extract_output_names() 

301 self.target_names_in_ = output_names 

302 if output_names: 302 ↛ 304line 302 didn't jump to line 304 because the condition on line 302 was always true

303 self.target_name_in_ = output_names[0] 

304 self.n_outputs_ = len(output_names) 

305 

306 fuzzy_desc = self.description_.get("fuzzy_system") if self.description_ else None 

307 self._fuzzy_system_dict_ = copy.deepcopy(fuzzy_desc) if fuzzy_desc is not None else None 

308 self._fuzzy_system_string_ = None 

309 self._fuzzy_system_ = None 

310 try: 

311 self._ensure_fuzzy_system() 

312 except ModuleNotFoundError: # pragma: no cover - happens in partial installs 

313 pass 

314 except AttributeError: # pragma: no cover - defensive for missing bindings 

315 pass 

316 

317 # Drop the live engine; predictions fall back to the serialized description. 

318 self.model_ = None 

319 

320 def _normalize_target_name_change(self, names): 

321 """Normalize provided names into a mapping old->new.""" 

322 current = list(getattr(self, "target_names_in_", []) or self._extract_output_names()) 

323 if not current: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 raise RuntimeError("Estimator does not expose any output variables to rename.") 

325 

326 if isinstance(names, str): 

327 if len(current) != 1: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 raise ValueError("Provide a mapping or list when renaming multi-output models.") 

329 mapping = {current[0]: str(names)} 

330 elif isinstance(names, Mapping): 

331 mapping = {str(k): str(v) for k, v in names.items()} 

332 unknown = sorted(set(mapping) - set(current)) 

333 if unknown: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 raise ValueError(f"Unknown output variables: {', '.join(unknown)}") 

335 elif isinstance(names, Sequence): 335 ↛ 343line 335 didn't jump to line 343 because the condition on line 335 was always true

336 new_names = [str(n) for n in names] 

337 if len(new_names) != len(current): 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 raise ValueError( 

339 f"Expected {len(current)} output names, got {len(new_names)}.", 

340 ) 

341 mapping = {old: new for old, new in zip(current, new_names, strict=False)} 

342 else: 

343 raise TypeError("`names` must be a string, sequence, or mapping.") 

344 

345 normalized = {old: new for old, new in mapping.items() if new and new != old} 

346 updated = [normalized.get(name, name) for name in current] 

347 if len(updated) != len(set(updated)): 

348 raise ValueError("Output names must be unique.") 

349 return normalized 

350 

351 def set_target_names(self, names): 

352 """Rename the output variables and refresh cached structures. 

353 

354 Args: 

355 names: String (single-output), sequence of strings matching the number 

356 of outputs, or a mapping ``{old_name: new_name}``. 

357 

358 Returns: 

359 self 

360 """ 

361 check_is_fitted(self, attributes=["description_", "is_fitted_"]) 

362 mapping = self._normalize_target_name_change(names) 

363 invalid = [target for target in mapping.values() if any(char.isspace() for char in target)] 

364 if invalid: 

365 raise ValueError( 

366 "Output variable names must not contain spaces: " + ", ".join(invalid), 

367 ) 

368 if not mapping: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 return self 

370 

371 fs = self.description_.get("fuzzy_system") 

372 if fs is None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 raise RuntimeError("Estimator is missing the fuzzy system description.") 

374 

375 variables = fs.get("variables", {}) 

376 outputs = variables.get("output", {}) 

377 if not outputs: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true

378 raise RuntimeError("Estimator description lacks fuzzy output variables.") 

379 

380 new_outputs = {} 

381 for var_name, sets in outputs.items(): 

382 target_name = mapping.get(var_name, var_name) 

383 renamed_sets = {} 

384 for set_name, value in sets.items(): 

385 renamed_sets[self._rename_membership_label(set_name, var_name, target_name)] = value 

386 new_outputs[target_name] = renamed_sets 

387 variables["output"] = new_outputs 

388 

389 rules = fs.get("rules", {}) 

390 new_rules = {} 

391 for rule_name, rule_def in rules.items(): 

392 updated_rule = {} 

393 for key, part in rule_def.items(): 

394 if key not in ("antecedents", "consequents") or not isinstance(part, dict): 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true

395 updated_rule[key] = part 

396 continue 

397 changed_part = {} 

398 for var, mf_dict in part.items(): 

399 renamed_var = mapping.get(var, var) 

400 if isinstance(mf_dict, dict): 400 ↛ 406line 400 didn't jump to line 406 because the condition on line 400 was always true

401 renamed_mf = { 

402 self._rename_membership_label(label, var, renamed_var): weight 

403 for label, weight in mf_dict.items() 

404 } 

405 else: 

406 renamed_mf = mf_dict 

407 changed_part[renamed_var] = renamed_mf 

408 updated_rule[key] = changed_part 

409 new_rules[rule_name] = updated_rule 

410 fs["rules"] = new_rules 

411 

412 defaults = fs.get("default_rules", {}) 

413 new_defaults = {} 

414 for var, label in defaults.items(): 

415 renamed_var = mapping.get(var, var) 

416 new_defaults[renamed_var] = self._rename_membership_label(label, var, renamed_var) 

417 fs["default_rules"] = new_defaults 

418 

419 thresholds = self.description_.get("defuzz_thresholds") 

420 if isinstance(thresholds, dict): 420 ↛ 427line 420 didn't jump to line 427 because the condition on line 420 was always true

421 new_thresholds = {} 

422 for var, value in thresholds.items(): 

423 renamed_var = mapping.get(var, var) 

424 new_thresholds[renamed_var] = value 

425 self.description_["defuzz_thresholds"] = new_thresholds 

426 

427 self._rebuild_from_description() 

428 return self 

429 

430 def _make_dataframe(self, arr, header): 

431 """Build the C++ DataFrame from a 2D numpy array and header labels.""" 

432 rows = [list(header)] + arr.astype(str).tolist() 

433 return DataFrame(rows, False) 

434 

435 def _prepare_dataframes(self, X_arr, y_arr=None, *, y_headers=None): 

436 """Create input/output DataFrame objects (output optional).""" 

437 if X_arr.ndim != 2: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 raise ValueError("X must be a 2D array") 

439 dfin = self._make_dataframe(X_arr, self.feature_names_in_) 

440 

441 if y_arr is None: 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true

442 return dfin, None 

443 

444 if y_arr.ndim == 1: 444 ↛ 445line 444 didn't jump to line 445 because the condition on line 444 was never true

445 y_arr = y_arr.reshape(-1, 1) 

446 if y_arr.shape[0] != X_arr.shape[0]: 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true

447 raise ValueError("X and y must have the same number of samples") 

448 

449 if y_headers is not None: 449 ↛ 452line 449 didn't jump to line 452 because the condition on line 449 was always true

450 headers = list(y_headers) 

451 else: 

452 headers = [f"OUT_{i + 1}" for i in range(y_arr.shape[1])] 

453 

454 dfout = self._make_dataframe(y_arr, headers) 

455 return dfin, dfout 

456 

457 def _resolve_feature_names(self, X, provided, n_features): 

458 """Resolve final feature names from DataFrame, provided list, or defaults.""" 

459 if isinstance(X, pd.DataFrame): 

460 names = list(X.columns) 

461 elif provided is not None: 

462 names = list(provided) 

463 else: 

464 names = [f"feature_{i + 1}" for i in range(n_features)] 

465 # ensure string column names for the C++ DataFrame 

466 names = [str(n) for n in names] 

467 

468 if len(names) != n_features: 

469 raise ValueError( 

470 "feature_names length does not match number of features", 

471 ) 

472 return names 

473 

474 def _resolve_target_headers(self, y, y_arr, provided): 

475 """Return (output headers, target name) inferred from y and overrides.""" 

476 if y_arr.ndim == 1: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 y_arr = y_arr.reshape(-1, 1) 

478 

479 if isinstance(y, pd.DataFrame): 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true

480 headers = list(y.columns) 

481 elif isinstance(y, pd.Series): 

482 headers = [y.name] if y.name else [] 

483 else: 

484 headers = [] 

485 

486 if not headers: 

487 if provided: 

488 if y_arr.shape[1] == 1: 488 ↛ 491line 488 didn't jump to line 491 because the condition on line 488 was always true

489 headers = [provided] 

490 else: 

491 headers = [f"{provided}_{i + 1}" for i in range(y_arr.shape[1])] 

492 else: 

493 headers = [f"OUT_{i + 1}" for i in range(y_arr.shape[1])] 

494 

495 # ensure string headers for the C++ DataFrame 

496 headers = [str(h) for h in headers] 

497 target_name = provided or (headers[0] if headers else "OUT") 

498 return headers, target_name 

499 

500 def _prepare_inference_input(self, X): 

501 """Align/validate prediction input and build the C++ DataFrame.""" 

502 if isinstance(X, pd.DataFrame): 

503 try: 

504 aligned = X.loc[:, self.feature_names_in_] 

505 except KeyError as exc: 

506 missing = set(self.feature_names_in_) - set(X.columns) 

507 raise ValueError( 

508 f"Missing features in input data: {sorted(missing)}", 

509 ) from exc 

510 raw = aligned.to_numpy(dtype=float) 

511 else: 

512 raw = np.asarray(X, dtype=float) 

513 

514 arr = check_array(raw, accept_sparse=False, ensure_2d=True, dtype=float) 

515 if arr.shape[1] != self.n_features_in_: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 raise ValueError( 

517 f"X has {arr.shape[1]} features, but {self.__class__.__name__} \ 

518 is expecting {self.n_features_in_} features as input", 

519 ) 

520 

521 dfin = self._make_dataframe(arr, self.feature_names_in_) 

522 return dfin, arr 

523 

524 def _ensure_fuzzy_system(self): 

525 """Rebuild and memoize the C++ FuzzySystem from the saved description.""" 

526 if getattr(self, "_fuzzy_system_", None) is not None: 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true

527 return self._fuzzy_system_ 

528 

529 serialized = getattr(self, "_fuzzy_system_string_", None) 

530 if not serialized: 

531 desc = getattr(self, "_fuzzy_system_dict_", None) 

532 if desc is None: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 if not hasattr(self, "description_"): 

534 raise RuntimeError("Estimator is missing the fuzzy system description") 

535 desc = self.description_.get("fuzzy_system") if self.description_ else None 

536 if desc is None: 

537 raise RuntimeError("Estimator does not contain a fuzzy system description") 

538 desc = copy.deepcopy(desc) 

539 self._fuzzy_system_dict_ = desc 

540 if isinstance(desc, dict): 540 ↛ 545line 540 didn't jump to line 545 because the condition on line 540 was always true

541 from . import _fuzzycoco_core # local import to avoid cycles 

542 

543 serialized = _fuzzycoco_core._named_list_from_dict_to_string(desc) 

544 else: 

545 serialized = str(desc) 

546 self._fuzzy_system_string_ = serialized 

547 

548 self._fuzzy_system_ = FuzzySystem.load_from_string(serialized) 

549 return self._fuzzy_system_ 

550 

551 def _fuzzy_system_description(self): 

552 """Return the fuzzy system section of the saved description (cached).""" 

553 desc = getattr(self, "_fuzzy_system_dict_", None) 

554 if isinstance(desc, Mapping): 554 ↛ 557line 554 didn't jump to line 557 because the condition on line 554 was always true

555 return desc 

556 

557 root = getattr(self, "description_", None) 

558 if isinstance(root, Mapping): 

559 fs = root.get("fuzzy_system") 

560 if isinstance(fs, Mapping): 

561 self._fuzzy_system_dict_ = copy.deepcopy(fs) 

562 return self._fuzzy_system_dict_ 

563 return None 

564 

565 def _default_rule_names(self): 

566 """Return the ordered list of default rule names defined in the system.""" 

567 desc = self._fuzzy_system_description() 

568 if isinstance(desc, Mapping): 568 ↛ 572line 568 didn't jump to line 572 because the condition on line 568 was always true

569 defaults = desc.get("default_rules") 

570 if isinstance(defaults, Mapping): 570 ↛ 572line 570 didn't jump to line 572 because the condition on line 570 was always true

571 return list(defaults.keys()) 

572 return [] 

573 

574 def _default_rule_labels(self): 

575 """Human-readable labels for default rules, aligned with _default_rule_names.""" 

576 names = self._default_rule_names() 

577 if not names: 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true

578 return [] 

579 

580 desc = self._fuzzy_system_description() 

581 defaults = desc.get("default_rules") if isinstance(desc, Mapping) else None 

582 variables = desc.get("variables", {}) if isinstance(desc, Mapping) else {} 

583 outputs = variables.get("output", {}) if isinstance(variables, Mapping) else {} 

584 

585 label_lookup: dict[str, dict[str, str]] = {} 

586 for var, sets in outputs.items(): 

587 if not isinstance(sets, Mapping): 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true

588 continue 

589 items = sorted(sets.items(), key=lambda kv: kv[1]) 

590 labels = generate_generic_labels(len(items)) 

591 var_map = label_lookup.setdefault(var, {}) 

592 for (orig_set, _pos), label in zip(items, labels, strict=False): 

593 var_map[orig_set] = label 

594 

595 labels: list[str] = [] 

596 for name in names: 

597 pretty = None 

598 if isinstance(defaults, Mapping): 598 ↛ 609line 598 didn't jump to line 609 because the condition on line 598 was always true

599 set_key = defaults.get(name) 

600 if isinstance(set_key, str): 600 ↛ 609line 600 didn't jump to line 609 because the condition on line 600 was always true

601 pretty = label_lookup.get(name, {}).get(set_key) 

602 if pretty is None: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 suffix = set_key.split(".")[-1] if "." in set_key else set_key 

604 suffix = suffix.replace("_", " ").strip() 

605 if suffix: 

606 pretty = suffix.title() 

607 if pretty: 607 ↛ 609line 607 didn't jump to line 609 because the condition on line 607 was always true

608 pretty = f"ELSE {name} is {pretty}" 

609 labels.append(pretty or f"default_{name}") 

610 return labels 

611 

612 def _default_rule_activations_from_levels(self, rule_levels): 

613 """Compute default rule fallbacks based solely on rule fire levels.""" 

614 desc = self._fuzzy_system_description() 

615 if not isinstance(desc, Mapping): 615 ↛ 616line 615 didn't jump to line 616 because the condition on line 615 was never true

616 return None 

617 

618 defaults = desc.get("default_rules") 

619 rules_desc = desc.get("rules") 

620 if not isinstance(defaults, Mapping) or not isinstance(rules_desc, Mapping): 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true

621 return None 

622 

623 default_names = list(defaults.keys()) 

624 if not default_names: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true

625 return None 

626 

627 rule_outputs: list[tuple[str, ...]] = [] 

628 for rule_def in rules_desc.values(): 

629 if isinstance(rule_def, Mapping): 629 ↛ 636line 629 didn't jump to line 636 because the condition on line 629 was always true

630 consequents = rule_def.get("consequents") 

631 if isinstance(consequents, Mapping): 631 ↛ 634line 631 didn't jump to line 634 because the condition on line 631 was always true

632 rule_outputs.append(tuple(consequents.keys())) 

633 else: 

634 rule_outputs.append(tuple()) 

635 else: 

636 rule_outputs.append(tuple()) 

637 

638 if len(rule_outputs) != len(rule_levels): 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true

639 return None 

640 

641 max_fire_by_var = {name: None for name in default_names} 

642 for fire_level, targets in zip(rule_levels.tolist(), rule_outputs, strict=False): 

643 if not np.isfinite(fire_level) or fire_level <= _MISSING_DATA_DOUBLE: 

644 continue 

645 for var in targets: 

646 if var not in max_fire_by_var: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 continue 

648 current = max_fire_by_var[var] 

649 if current is None or fire_level > current: 

650 max_fire_by_var[var] = float(fire_level) 

651 

652 result: dict[str, float] = {} 

653 for name in default_names: 

654 max_fire = max_fire_by_var[name] 

655 if max_fire is None: 

656 result[name] = 0.0 

657 else: 

658 clipped = float(np.clip(max_fire, 0.0, 1.0)) 

659 result[name] = max(0.0, 1.0 - clipped) 

660 return result 

661 

662 def _predict_dataframe(self, dfin): 

663 """Predict using the live engine when available, else via saved description.""" 

664 model = getattr(self, "model_", None) 

665 if model is not None: 

666 return model.predict(dfin) 

667 from . import _fuzzycoco_core # local import to avoid circular deps 

668 

669 if not getattr(self, "description_", None): 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true

670 raise RuntimeError("Missing model description for prediction") 

671 return _fuzzycoco_core.FuzzyCoco.load_and_predict_from_dict(dfin, self.description_) 

672 

673 def _rule_activations_from_sample_values(self, sample_values): 

674 """Compute RuleActivations for a validated 1D list of floats.""" 

675 values = self._compute_rule_fire_levels(sample_values) 

676 default_map = self._default_rule_activations_from_levels(values) 

677 return RuleActivations(values, default_map) 

678 

679 def _compute_rule_fire_levels(self, sample): 

680 """Compute rule activations for a single sample (1D).""" 

681 model = getattr(self, "model_", None) 

682 if model is not None: 

683 values = model.rules_fire_from_values(sample) 

684 else: 

685 from . import _fuzzycoco_core 

686 

687 values_matrix = _fuzzycoco_core._rules_fire_matrix_from_description( 

688 self.description_, 

689 [sample], 

690 ) 

691 values = values_matrix[0] 

692 return np.asarray(values, dtype=float) 

693 

694 def _validate_fit_kwargs(self, fit_params): 

695 params = dict(fit_params) 

696 feature_names = params.pop("feature_names", None) 

697 target_name = params.pop("target_name", None) 

698 params.pop("output_filename", None) 

699 if params: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 unexpected = ", ".join(sorted(params)) 

701 raise TypeError(f"Unexpected fit parameters: {unexpected}") 

702 return feature_names, target_name 

703 

704 # ────────────────────────────────────────────────────────────────────── 

705 # public API 

706 # ────────────────────────────────────────────────────────────────────── 

707 def fit(self, X, y, **fit_params): 

708 """Fit a fuzzy rule-based model. 

709 

710 Args: 

711 X: 2D array-like or pandas DataFrame of shape (n_samples, n_features). 

712 y: 1D or 2D array-like or pandas Series/DataFrame with targets. 

713 **fit_params: Optional keyword-only parameters: 

714 - ``feature_names``: list of column names to use when ``X`` is not a DataFrame. 

715 - ``target_name``: name of the output variable in the fuzzy system. 

716 

717 Returns: 

718 The fitted estimator instance. 

719 """ 

720 feature_names, target_name = self._validate_fit_kwargs(fit_params) 

721 return self._fit_internal( 

722 X, 

723 y, 

724 feature_names=feature_names, 

725 target_name=target_name, 

726 callback=None, 

727 max_generations=None, 

728 max_fitness=None, 

729 influence=None, 

730 evolving_ratio=None, 

731 ) 

732 

733 def fit_stepwise( 

734 self, 

735 X, 

736 y, 

737 *, 

738 callback: Callable[[FitStepInfo], object] | None = None, 

739 max_generations: int | None = None, 

740 max_fitness: float | None = None, 

741 influence: bool | None = None, 

742 evolving_ratio: float | None = None, 

743 **fit_params, 

744 ): 

745 """Fit while exposing each training generation to a callback. 

746 

747 The provided ``callback`` receives a :class:`FitStepInfo` instance 

748 after every generation. Returning ``False`` from the callback stops 

749 the evolution early; any other return value continues the loop. 

750 """ 

751 feature_names, target_name = self._validate_fit_kwargs(fit_params) 

752 return self._fit_internal( 

753 X, 

754 y, 

755 feature_names=feature_names, 

756 target_name=target_name, 

757 callback=callback, 

758 max_generations=max_generations, 

759 max_fitness=max_fitness, 

760 influence=influence, 

761 evolving_ratio=evolving_ratio, 

762 ) 

763 

764 def _fit_internal( 

765 self, 

766 X, 

767 y, 

768 *, 

769 feature_names, 

770 target_name, 

771 callback, 

772 max_generations, 

773 max_fitness, 

774 influence, 

775 evolving_ratio, 

776 ): 

777 X_arr, y_arr = check_X_y( 

778 X, 

779 y, 

780 multi_output=True, 

781 accept_sparse=False, 

782 ensure_2d=True, 

783 dtype=float, 

784 ) 

785 

786 self.feature_names_in_ = self._resolve_feature_names(X, feature_names, X_arr.shape[1]) 

787 self.n_features_in_ = len(self.feature_names_in_) 

788 

789 y_arr = np.asarray(y_arr, dtype=float) 

790 y_2d = y_arr.reshape(-1, 1) if y_arr.ndim == 1 else y_arr 

791 y_headers, resolved_target = self._resolve_target_headers(y, y_2d, target_name) 

792 self.target_name_in_ = resolved_target 

793 self.target_names_in_ = list(y_headers) 

794 self.n_outputs_ = y_2d.shape[1] 

795 

796 metrics_weights = self.metrics_weights 

797 if metrics_weights is None: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 metrics_weights = self._default_metrics_weights 

799 

800 params_obj = build_fuzzycoco_params( 

801 nb_features=self.n_features_in_, 

802 n_outputs=self.n_outputs_, 

803 nb_rules=self.nb_rules, 

804 nb_max_var_per_rule=self.nb_max_var_per_rule, 

805 max_generations=self.max_generations, 

806 max_fitness=self.max_fitness, 

807 nb_cooperators=self.nb_cooperators, 

808 influence_rules_initial_population=self.influence_rules_initial_population, 

809 influence_evolving_ratio=self.influence_evolving_ratio, 

810 nb_sets_in=self.nb_sets_in, 

811 nb_sets_out=self.nb_sets_out, 

812 nb_bits_pos_in=self.nb_bits_pos_in, 

813 nb_bits_pos_out=self.nb_bits_pos_out, 

814 nb_bits_vars_in=self.nb_bits_vars_in, 

815 nb_bits_vars_out=self.nb_bits_vars_out, 

816 nb_bits_sets_in=self.nb_bits_sets_in, 

817 nb_bits_sets_out=self.nb_bits_sets_out, 

818 pop_size_rules=self.pop_size_rules, 

819 elite_size_rules=self.elite_size_rules, 

820 cx_prob_rules=self.cx_prob_rules, 

821 mut_flip_genome_rules=self.mut_flip_genome_rules, 

822 mut_flip_bit_rules=self.mut_flip_bit_rules, 

823 pop_size_mfs=self.pop_size_mfs, 

824 elite_size_mfs=self.elite_size_mfs, 

825 cx_prob_mfs=self.cx_prob_mfs, 

826 mut_flip_genome_mfs=self.mut_flip_genome_mfs, 

827 mut_flip_bit_mfs=self.mut_flip_bit_mfs, 

828 threshold=self.threshold, 

829 metrics_weights=metrics_weights, 

830 features_weights=self.features_weights, 

831 ) 

832 

833 if hasattr(params_obj, "fitness_params"): 833 ↛ 835line 833 didn't jump to line 835 because the condition on line 833 was always true

834 params_obj.fitness_params.fix_output_thresholds(self.n_outputs_) 

835 self._fuzzy_params_ = params_obj 

836 

837 dfin, dfout = self._prepare_dataframes(X_arr, y_2d, y_headers=y_headers) 

838 rng = RandomGenerator(self._resolve_seed()) 

839 self.model_ = FuzzyCoco(dfin, dfout, params_obj, rng) 

840 self._fitness_trace = [] 

841 

842 self._run_training_loop( 

843 callback=callback, 

844 max_generations=max_generations, 

845 max_fitness=max_fitness, 

846 influence=influence, 

847 evolving_ratio=evolving_ratio, 

848 ) 

849 

850 self._finalize_after_training() 

851 return self 

852 

853 def _run_training_loop( 

854 self, 

855 *, 

856 callback: Callable[[FitStepInfo], object] | None, 

857 max_generations: int | None, 

858 max_fitness: float | None, 

859 influence: bool | None, 

860 evolving_ratio: float | None, 

861 ): 

862 if self.model_ is None: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 raise RuntimeError("Training model has not been initialised") 

864 if not hasattr(self, "_fuzzy_params_") or self._fuzzy_params_ is None: 864 ↛ 865line 864 didn't jump to line 865 because the condition on line 864 was never true

865 raise RuntimeError("Missing FuzzyCoco parameters; call fit first") 

866 

867 global_params = self._fuzzy_params_.global_params 

868 

869 total_generations = global_params.max_generations if max_generations is None else max_generations 

870 if total_generations is None: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true

871 total_generations = 0 

872 total_generations = int(total_generations) 

873 if total_generations < 0: 873 ↛ 874line 873 didn't jump to line 874 because the condition on line 873 was never true

874 raise ValueError("max_generations must be non-negative") 

875 

876 target_fitness = global_params.max_fitness if max_fitness is None else max_fitness 

877 if target_fitness is None: 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true

878 target_fitness = float("inf") 

879 else: 

880 target_fitness = float(target_fitness) 

881 

882 influence_flag = global_params.influence_rules_initial_population if influence is None else bool(influence) 

883 evolving_ratio_value = ( 

884 global_params.influence_evolving_ratio if evolving_ratio is None else float(evolving_ratio) 

885 ) 

886 

887 model = self.model_ 

888 model.init(influence=influence_flag, evolving_ratio=evolving_ratio_value) 

889 

890 if total_generations == 0: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 return 

892 

893 for _ in range(total_generations): 

894 fitness = float(model.step()) 

895 generation = int(model.current_generation()) 

896 self._fitness_trace.append(fitness) 

897 

898 if callback is not None: 898 ↛ 899line 898 didn't jump to line 899 because the condition on line 898 was never true

899 step_info = FitStepInfo( 

900 generation=generation, 

901 fitness=fitness, 

902 history=tuple(self._fitness_trace), 

903 model=model, 

904 estimator=self, 

905 ) 

906 should_continue = callback(step_info) 

907 if should_continue is False: 

908 break 

909 

910 if np.isfinite(target_fitness) and fitness >= target_fitness: 910 ↛ 911line 910 didn't jump to line 911 because the condition on line 910 was never true

911 break 

912 

913 def _finalize_after_training(self): 

914 if self.model_ is None: 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true

915 raise RuntimeError("Training model has not been initialised") 

916 

917 self.model_.select_best() 

918 self.description_ = self.model_.describe() 

919 

920 fuzzy_system_desc = self.description_.get("fuzzy_system") 

921 if fuzzy_system_desc is None: 921 ↛ 922line 921 didn't jump to line 922 because the condition on line 921 was never true

922 raise RuntimeError("Model description missing 'fuzzy_system' section") 

923 

924 self._fuzzy_system_dict_ = copy.deepcopy(fuzzy_system_desc) 

925 self._fuzzy_system_string_ = self.model_.serialize_fuzzy_system() 

926 self._fuzzy_system_ = FuzzySystem.load_from_string(self._fuzzy_system_string_) 

927 

928 parsed = parse_fuzzy_system_from_description(self.description_) 

929 self.variables_, self.rules_, self.default_rules_ = to_linguistic_components(*parsed) 

930 self.variables_view_, self.rules_view_, self.default_rules_view_ = to_views_components(*parsed) 

931 self.variables_df_, self.rules_df_ = to_tables_components(*parsed) 

932 

933 self.fitness_history_ = np.asarray(self._fitness_trace, dtype=float) 

934 self.n_generations_run_ = int(self.model_.current_generation()) 

935 

936 self.is_fitted_ = True 

937 

938 def predict(self, X): 

939 """Predict outputs for ``X``. 

940 

941 Implemented by subclasses; here only to document the public API. 

942 

943 Args: 

944 X: 2D array-like or pandas DataFrame aligned with ``feature_names_in_``. 

945 

946 Returns: 

947 ndarray of predictions; shape depends on the specific estimator. 

948 """ 

949 raise NotImplementedError 

950 

951 def score(self, X, y, scoring=None): 

952 """Compute a default metric on the given test data. 

953 

954 Args: 

955 X: Test features. 

956 y: True targets. 

957 scoring: Optional scikit-learn scorer string or callable. If ``None``, 

958 uses ``"accuracy"`` for classifiers and ``"r2"`` for regressors. 

959 

960 Returns: 

961 The score as a float. 

962 """ 

963 scorer = get_scorer(scoring or self._default_scorer) 

964 return scorer(self, X, y) 

965 

966 def rules_activations(self, X): 

967 """Compute rule activation levels for a single sample. 

968 

969 Args: 

970 X: Single sample as 1D array-like, pandas Series, or single-row DataFrame. 

971 

972 Returns: 

973 1D numpy array (``RuleActivations``) of length ``n_rules`` with fire levels 

974 in [0, 1]. The array exposes a ``default_rules`` attribute containing a 

975 dict mapping each default rule (one per output variable) to its computed 

976 fallback activation. 

977 """ 

978 check_is_fitted(self, attributes=["model_"]) 

979 sample = self._as_1d_sample(X) 

980 if len(sample) != self.n_features_in_: 980 ↛ 981line 980 didn't jump to line 981 because the condition on line 980 was never true

981 raise ValueError( 

982 f"Expected {self.n_features_in_} features, got {len(sample)}", 

983 ) 

984 return self._rule_activations_from_sample_values(sample) 

985 

986 def rules_stat_activations(self, X, threshold=1e-12, return_matrix=False, sort_by_impact=True): 

987 """Compute aggregate rule activations for a batch of samples. 

988 

989 Args: 

990 X: 2D array-like or DataFrame of samples to analyze. 

991 threshold: Minimum activation value to count a rule as "used". 

992 return_matrix: If True, also return the (n_samples, n_rules) activation matrix. 

993 sort_by_impact: If True, sort the summary by estimated impact. 

994 

995 Returns: 

996 If ``return_matrix`` is False, a pandas DataFrame with per-rule statistics 

997 (mean, std, min, max, usage rates, and impact). If True, returns a tuple 

998 ``(stats_df, activations_matrix)`` where ``activations_matrix`` is a 

999 ``RuleActivationMatrix`` carrying a ``default_rules`` attribute that stores, 

1000 for each sample, a dict of fallback activations for the default rules. 

1001 """ 

1002 

1003 check_is_fitted(self, attributes=["model_"]) 

1004 

1005 if isinstance(X, pd.DataFrame): 1005 ↛ 1006line 1005 didn't jump to line 1006 because the condition on line 1005 was never true

1006 try: 

1007 arr_raw = X.loc[:, self.feature_names_in_].to_numpy(dtype=float) 

1008 except KeyError as exc: 

1009 missing = set(self.feature_names_in_) - set(X.columns) 

1010 raise ValueError( 

1011 f"Missing features in input data: {sorted(missing)}", 

1012 ) from exc 

1013 else: 

1014 arr_raw = np.asarray(X, dtype=float) 

1015 

1016 arr = check_array(arr_raw, accept_sparse=False, ensure_2d=True, dtype=float) 

1017 if arr.shape[0] == 0: 1017 ↛ 1018line 1017 didn't jump to line 1018 because the condition on line 1017 was never true

1018 raise ValueError("Empty X.") 

1019 if arr.shape[1] != self.n_features_in_: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true

1020 raise ValueError( 

1021 f"Expected {self.n_features_in_} features, got {arr.shape[1]}", 

1022 ) 

1023 

1024 activation_rows = [] 

1025 default_rows = [] 

1026 has_default_payload = False 

1027 for row in arr: 

1028 sample_values = row.astype(float).tolist() 

1029 values = self._rule_activations_from_sample_values(sample_values) 

1030 activation_rows.append(np.asarray(values, dtype=float)) 

1031 default_rows.append(values.default_rules) 

1032 has_default_payload |= values.default_rules is not None 

1033 

1034 activations_matrix = np.vstack(activation_rows) 

1035 default_payload = tuple(default_rows) if has_default_payload else None 

1036 

1037 default_matrix = None 

1038 default_names = self._default_rule_names() if has_default_payload else [] 

1039 default_labels = self._default_rule_labels() if has_default_payload else [] 

1040 if has_default_payload and default_names: 1040 ↛ 1051line 1040 didn't jump to line 1051 because the condition on line 1040 was always true

1041 default_matrix = np.zeros((activations_matrix.shape[0], len(default_names)), dtype=float) 

1042 for sample_idx, payload in enumerate(default_rows): 

1043 if payload is None: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true

1044 continue 

1045 for name_idx, name in enumerate(default_names): 

1046 value = payload.get(name) 

1047 if value is None: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the condition on line 1047 was never true

1048 continue 

1049 default_matrix[sample_idx, name_idx] = float(value) 

1050 

1051 if default_matrix is not None and default_names: 1051 ↛ 1056line 1051 didn't jump to line 1056 because the condition on line 1051 was always true

1052 combined_matrix = np.hstack([activations_matrix, default_matrix]) 

1053 default_idx_labels = default_labels or [f"default_{name}" for name in default_names] 

1054 matrix_labels = self._rules_index(activations_matrix.shape[1]) + default_idx_labels 

1055 else: 

1056 combined_matrix = activations_matrix 

1057 matrix_labels = self._rules_index(combined_matrix.shape[1]) 

1058 

1059 activations = RuleActivationMatrix(combined_matrix, default_payload) 

1060 activations_view = np.asarray(activations) 

1061 

1062 sums = activations_view.sum(axis=1, keepdims=True) 

1063 share = np.divide( 

1064 activations_view, 

1065 sums, 

1066 out=np.zeros_like(activations_view), 

1067 where=sums > 0, 

1068 ) 

1069 

1070 usage_rate = (activations_view >= threshold).mean(axis=0) 

1071 usage_rate_pct = 100.0 * usage_rate 

1072 importance_pct = 100.0 * share.mean(axis=0) 

1073 impact_pct = usage_rate * importance_pct 

1074 

1075 stats = pd.DataFrame( 

1076 { 

1077 "mean": activations_view.mean(axis=0), 

1078 "std": activations_view.std(axis=0), 

1079 "min": activations_view.min(axis=0), 

1080 "max": activations_view.max(axis=0), 

1081 "usage_rate": usage_rate, 

1082 "usage_rate_pct": usage_rate_pct, 

1083 "importance_pct": importance_pct, 

1084 "impact_pct": impact_pct, 

1085 }, 

1086 index=matrix_labels, 

1087 ) 

1088 

1089 if sort_by_impact: 

1090 stats = stats.sort_values("impact_pct", ascending=False) 

1091 

1092 return (stats, activations) if return_matrix else stats 

1093 

1094 # ---- helpers ---- 

1095 def _as_1d_sample(self, X): 

1096 """Normalize various single‑row inputs (array/Series/DF) to a 1D list.""" 

1097 if isinstance(X, pd.Series): 1097 ↛ 1098line 1097 didn't jump to line 1098 because the condition on line 1097 was never true

1098 aligned = X.reindex(self.feature_names_in_) 

1099 if aligned.isnull().any(): 

1100 missing = aligned[aligned.isnull()].index.tolist() 

1101 raise ValueError(f"Missing features in sample: {missing}") 

1102 arr = aligned.to_numpy(dtype=float) 

1103 elif isinstance(X, pd.DataFrame): 

1104 if len(X) != 1: 1104 ↛ 1105line 1104 didn't jump to line 1105 because the condition on line 1104 was never true

1105 raise ValueError("Provide a single-row DataFrame for rules_activations.") 

1106 try: 

1107 arr = X.loc[:, self.feature_names_in_].to_numpy(dtype=float)[0] 

1108 except KeyError as exc: 

1109 missing = set(self.feature_names_in_) - set(X.columns) 

1110 raise ValueError( 

1111 f"Missing features in sample: {sorted(missing)}", 

1112 ) from exc 

1113 else: 

1114 arr = np.asarray(X, dtype=float) 

1115 if arr.ndim == 2 and arr.shape[0] == 1: 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true

1116 arr = arr[0] 

1117 elif arr.ndim != 1: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 raise ValueError( 

1119 "rules_activations expects a 1D sample or single-row 2D array.", 

1120 ) 

1121 

1122 if arr.shape[0] != self.n_features_in_: 1122 ↛ 1123line 1122 didn't jump to line 1123 because the condition on line 1122 was never true

1123 raise ValueError( 

1124 f"Expected {self.n_features_in_} features, got {arr.shape[0]}", 

1125 ) 

1126 

1127 return arr.astype(float).tolist() 

1128 

1129 def _rules_index(self, n_rules): 

1130 names = getattr(self, "rules_", None) 

1131 if isinstance(names, list | tuple) and len(names) == n_rules: 1131 ↛ 1133line 1131 didn't jump to line 1133 because the condition on line 1131 was always true

1132 return list(names) 

1133 return [f"rule_{i}" for i in range(n_rules)] 

1134 

1135 def __getstate__(self): 

1136 state = self.__dict__.copy() 

1137 state.pop("model_", None) 

1138 state.pop("_fuzzy_system_", None) 

1139 params = state.get("_fuzzy_params_") 

1140 if isinstance(params, FuzzyCocoParams): 1140 ↛ 1142line 1140 didn't jump to line 1142 because the condition on line 1140 was always true

1141 state["_fuzzy_params_"] = copy.deepcopy(params.describe()) 

1142 return state 

1143 

1144 def __setstate__(self, state): 

1145 params = state.get("_fuzzy_params_") 

1146 if isinstance(params, dict): 1146 ↛ 1148line 1146 didn't jump to line 1148 because the condition on line 1146 was always true

1147 state["_fuzzy_params_"] = FuzzyCocoParams.from_dict(params) 

1148 self.__dict__.update(state) 

1149 

1150 output_names = [] 

1151 if getattr(self, "description_", None): 1151 ↛ 1153line 1151 didn't jump to line 1153 because the condition on line 1151 was always true

1152 output_names = self._extract_output_names() 

1153 if not getattr(self, "target_names_in_", None): 1153 ↛ 1154line 1153 didn't jump to line 1154 because the condition on line 1153 was never true

1154 self.target_names_in_ = output_names 

1155 if output_names and not getattr(self, "target_name_in_", None): 1155 ↛ 1156line 1155 didn't jump to line 1156 because the condition on line 1155 was never true

1156 self.target_name_in_ = output_names[0] 

1157 

1158 self.model_ = None 

1159 self._fuzzy_system_ = None 

1160 if getattr(self, "_fuzzy_system_dict_", None) is None and getattr(self, "description_", None): 1160 ↛ 1161line 1160 didn't jump to line 1161 because the condition on line 1160 was never true

1161 fuzzy_desc = self.description_.get("fuzzy_system") if self.description_ else None 

1162 if fuzzy_desc is not None: 

1163 self._fuzzy_system_dict_ = copy.deepcopy(fuzzy_desc) 

1164 if state.get("is_fitted_", False): 1164 ↛ exitline 1164 didn't return from function '__setstate__' because the condition on line 1164 was always true

1165 self._ensure_fuzzy_system() 

1166 

1167 def save(self, filepath, *, compress=3): 

1168 """Save this fitted estimator to disk (convenience wrapper). 

1169 

1170 Args: 

1171 filepath: Destination path for the serialized estimator. 

1172 compress: Joblib compression parameter. 

1173 

1174 Returns: 

1175 The path used to save the model. 

1176 """ 

1177 return save_model(self, filepath, compress=compress) 

1178 

1179 @classmethod 

1180 def load(cls, filepath): 

1181 """Load a previously saved estimator instance of this class. 

1182 

1183 Args: 

1184 filepath: Path to the serialized estimator created via :meth:`save`. 

1185 

1186 Returns: 

1187 An instance of the estimator loaded from disk. 

1188 """ 

1189 model = load_model(filepath) 

1190 if not isinstance(model, cls): 

1191 raise TypeError( 

1192 f"Expected instance of {cls.__name__}, got {type(model).__name__}", 

1193 ) 

1194 return model 

1195 

1196 def describe(self): 

1197 """Return the full model description (variables, rules, defaults). 

1198 

1199 Returns: 

1200 A dictionary mirroring the native engine description, including 

1201 the serialized fuzzy system and related metadata. 

1202 """ 

1203 return self.description_ 

1204 

1205 

1206# ──────────────────────────────────────────────────────────────────────────────── 

1207# Classifier wrapper 

1208# ──────────────────────────────────────────────────────────────────────────────── 

1209class FuzzyCocoClassifier(ClassifierMixin, FuzzyCocoPlotMixin, _FuzzyCocoBase): 

1210 _default_scorer = "accuracy" 

1211 _default_metrics_weights = {"accuracy": 1.0} 

1212 

1213 def fit(self, X, y, **kwargs): 

1214 """Fit the classifier on ``X`` and ``y``. 

1215 

1216 See :meth:`_FuzzyCocoBase.fit` for details on accepted parameters. 

1217 """ 

1218 y_arr = np.asarray(y) 

1219 if y_arr.ndim == 1: 

1220 self.classes_ = np.unique(y_arr) 

1221 else: 

1222 self.classes_ = [np.unique(y_arr[:, i]) for i in range(y_arr.shape[1])] 

1223 return super().fit(X, y, **kwargs) 

1224 

1225 def fit_stepwise( 

1226 self, 

1227 X, 

1228 y, 

1229 *, 

1230 callback=None, 

1231 max_generations=None, 

1232 max_fitness=None, 

1233 influence=None, 

1234 evolving_ratio=None, 

1235 **fit_params, 

1236 ): 

1237 y_arr = np.asarray(y) 

1238 if y_arr.ndim == 1: 

1239 self.classes_ = np.unique(y_arr) 

1240 else: 

1241 self.classes_ = [np.unique(y_arr[:, i]) for i in range(y_arr.shape[1])] 

1242 return super().fit_stepwise( 

1243 X, 

1244 y, 

1245 callback=callback, 

1246 max_generations=max_generations, 

1247 max_fitness=max_fitness, 

1248 influence=influence, 

1249 evolving_ratio=evolving_ratio, 

1250 **fit_params, 

1251 ) 

1252 

1253 def predict(self, X): 

1254 """Predict class labels for ``X``. 

1255 

1256 Returns numpy array of labels matching the original label dtype. 

1257 """ 

1258 check_is_fitted(self, attributes=["model_"]) 

1259 dfin, _ = self._prepare_inference_input(X) 

1260 preds_df = self._predict_dataframe(dfin) 

1261 raw = preds_df.to_list() # list of rows 

1262 

1263 if isinstance(self.classes_[0], np.ndarray) or isinstance(self.classes_, list): 

1264 n_outputs = len(self.classes_) 

1265 y_pred = np.empty((len(raw), n_outputs), dtype=self.classes_[0].dtype) 

1266 for i, row in enumerate(raw): 

1267 for j, val in enumerate(row[:n_outputs]): 

1268 idx = int(round(val)) 

1269 idx = np.clip(idx, 0, len(self.classes_[j]) - 1) 

1270 y_pred[i, j] = self.classes_[j][idx] 

1271 if n_outputs == 1: 1271 ↛ 1272line 1271 didn't jump to line 1272 because the condition on line 1271 was never true

1272 return y_pred.ravel() 

1273 return y_pred 

1274 else: 

1275 # single output path 

1276 y_pred_idx = np.array([int(round(v[0])) for v in raw]) 

1277 y_pred_idx = np.clip(y_pred_idx, 0, len(self.classes_) - 1) 

1278 return self.classes_[y_pred_idx] 

1279 

1280 

1281# ──────────────────────────────────────────────────────────────────────────────── 

1282# Regressor wrapper 

1283# ──────────────────────────────────────────────────────────────────────────────── 

1284class FuzzyCocoRegressor(RegressorMixin, FuzzyCocoPlotMixin, _FuzzyCocoBase): 

1285 _default_scorer = "r2" 

1286 _default_metrics_weights = {"rmse": 1.0} 

1287 

1288 def predict(self, X): 

1289 """Predict continuous targets for ``X``. 

1290 

1291 Returns a 1D array for single-output models or 2D for multi-output. 

1292 """ 

1293 check_is_fitted(self, attributes=["model_"]) 

1294 dfin, _ = self._prepare_inference_input(X) 

1295 preds_df = self._predict_dataframe(dfin) 

1296 raw = np.asarray(preds_df.to_list(), dtype=float) 

1297 return raw.ravel() if raw.shape[1] == 1 else raw