From c16e910c14f3414df375de964ae797f7944b8215 Mon Sep 17 00:00:00 2001 From: David GAILLETON Date: Wed, 11 Feb 2026 18:02:15 +0100 Subject: [PATCH] 05/ex1 --- 05/ex0/stream_processor.py | 160 ++++++++++++++++++++++ 05/ex1/data_stream.py | 265 +++++++++++++++++++++++++++++++++++++ 2 files changed, 425 insertions(+) create mode 100644 05/ex0/stream_processor.py create mode 100644 05/ex1/data_stream.py diff --git a/05/ex0/stream_processor.py b/05/ex0/stream_processor.py new file mode 100644 index 0000000..f92502e --- /dev/null +++ b/05/ex0/stream_processor.py @@ -0,0 +1,160 @@ +from abc import ABC, abstractmethod +from typing import Any +from typing_extensions import override + + +class DataProcessor(ABC): + @abstractmethod + def process(self, data: Any) -> str: + pass + + @abstractmethod + def validate(self, data: Any) -> bool: + pass + + def format_output(self, result: str) -> str: + return f"Processed output: {result}" + + +class NumericProcessor(DataProcessor): + def process(self, data: Any) -> str: + try: + return f"{data}" + except Exception as err: + print(err) + return "error" + + def validate(self, data: Any) -> bool: + try: + for n in data: + if type(n).__name__ != "int": + return False + return True + except Exception: + return False + + @override + def format_output(self, result: str) -> str: + try: + to_processed = result.split(", ") + res_int: list[int] = [] + for n in to_processed: + n = n.replace("[", "").replace("]", "") + if n.isnumeric(): + res_int = res_int + [int(n)] + avg = sum(res_int) / len(res_int) + return f"Processed {len(res_int)} numeric value,\ + sum={sum(res_int)}, avg={avg:.2f}" + except Exception as err: + print(f"NumericProcessor / format_output: {err}") + return "error" + + +class TextProcessor(DataProcessor): + def process(self, data: Any) -> str: + return str(data) + + def validate(self, data: Any) -> bool: + try: + for char in data: + if type(char).__name__ != "str": + return False + return True + except Exception: + return False + + @override + def format_output(self, result: str) -> str: + return f"Processed text: {len(result)}\ + characters, {len(result.split(' '))} words" + + +class LogProcessor(DataProcessor): + def process(self, data: Any) -> str: + try: + return f"{data[0]}: {data[1]}" + except Exception as err: + print(err) + return "error" + + def validate(self, data: Any) -> bool: + try: + if len(data) == 2: + for n in data: + if type(n).__name__ != "str": + return False + return True + return False + except Exception: + return False + + @override + def format_output(self, result: str) -> str: + try: + log_res = result.split(":")[0] + if log_res == "ERROR": + header = "[ALERT]" + elif log_res == "INFO": + header = "[INFO]" + elif log_res == "SUCCESS": + header = "[SUCCESS]" + else: + header = "[UNKNOWN]" + return ( + f"{header} {result.split(':', 1)[0]}{result.split(':', 1)[1]}" + ) + except Exception as err: + print(err) + return "error" + + +def class_tester() -> None: + print("=== CODE NEXUS - DATA PROCESSOR FOUNDATION ===\n") + print("Initializing Numeric Processor...") + data = [1, 2, 3, 4, 5] + processor = NumericProcessor() + res = processor.process(data) + is_valid = processor.validate(data) + formatted_output = processor.format_output(res) + print(f"processing data: {res}") + if is_valid: + print("Validation: Numeric data verified") + else: + print("Validation: Invalid numeric data") + print(f"Output: {formatted_output}") + print("\nInitializing Text Processor...") + data = "Hello Nexus World" + processor = TextProcessor() + res = processor.process(data) + is_valid = processor.validate(data) + formatted_output = processor.format_output(res) + print(f"processing data: {res}") + if is_valid: + print("Validation: Text data verified") + else: + print("Validation: Invalid text data") + print(f"Output: {formatted_output}") + print("\nInitializing Log Processor...") + data = ["ERROR", "Connection timeout"] + processor = LogProcessor() + res = processor.process(data) + is_valid = processor.validate(data) + formatted_output = processor.format_output(res) + print(f"processing data: {res}") + if is_valid: + print("Validation: Log data verified") + else: + print("Validation: Invalid log data") + print(f"Output: {formatted_output}") + print("\n=== Polymorphic Processing Demo ===\n") + print(f"Result 1:\ + {NumericProcessor().format_output('[1, 2, 3]')}") + print(f"Result 2:\ + {TextProcessor().format_output('Hello World !')}") + print(f"Result 3:\ + {LogProcessor().format_output('INFO: level detected: System ready')}") + print("\nFoundation systems online. Nexus ready for advanced streams.") + + +if __name__ == "__main__": + class_tester() diff --git a/05/ex1/data_stream.py b/05/ex1/data_stream.py new file mode 100644 index 0000000..95b2bcf --- /dev/null +++ b/05/ex1/data_stream.py @@ -0,0 +1,265 @@ +from abc import ABC, abstractmethod +from re import error +import re +from typing import Any, List, Optional, Dict, Union +from typing_extensions import override + + +class DataStream(ABC): + def __init__(self, stream_id: str, stream_type: str) -> None: + self.stream_id = stream_id + self.stream_type = stream_type + self.data_batch: List[str] = [] + + @abstractmethod + def process_batch(self, data_batch: List[Any]) -> str: + pass + + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: + res = [] + for data in data_batch: + if data.split(":", 1)[0] in criteria: + res += [data] + return res + + def get_stats(self) -> Dict[str, Union[str, int, float]]: + return { + "nb_process": len(self.data_batch), + } + + +class SensorStream(DataStream): + def __init__(self, stream_id: str, stream_type: str) -> None: + super().__init__(stream_id, stream_type) + + def process_batch(self, data_batch: List[Any]) -> str: + res: List[str] = [] + for data in data_batch: + try: + if len(data.split(":", 1)) == 2: + if data.split(":", 1)[0] == "temp": + float(data.split(":", 1)[1]) + elif ( + data.split(":", 1)[0] == "humidity" + or data.split(":", 1)[0] == "pressure" + ): + int(data.split(":", 1)[1]) + else: + continue + res += [data] + except Exception: + continue + self.data_batch += res + return f"{res}" + + @override + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: + return super().filter_data(data_batch, criteria) + + @override + def get_stats(self) -> Dict[str, Union[str, int, float]]: + temp_data: List[float] = [] + for temp in self.filter_data(self.data_batch, "temp"): + temp_data += [float(temp.split(":", 1)[1])] + res: Dict[str, Union[str, int, float]] = {} + res["nb_process"] = len(self.data_batch) + res["avg_temp"] = sum(temp_data) / len(temp_data) + return res + + +class TransactionStream(DataStream): + def __init__(self, stream_id: str, stream_type: str) -> None: + super().__init__(stream_id, stream_type) + + def process_batch(self, data_batch: List[Any]) -> str: + res: List[str] = [] + for data in data_batch: + try: + if len(data.split(":", 1)) == 2: + if ( + data.split(":", 1)[0] == "buy" + or data.split(":", 1)[0] == "sell" + ): + int(data.split(":", 1)[1]) + else: + continue + res += [data] + except Exception: + continue + self.data_batch += res + return f"{res}" + + @override + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: + res: List[Any] = [] + if criteria and ">" == criteria[0]: + try: + higher_than = int(criteria.split(">")[1]) + for data in data_batch: + if int(data.split(":", 1)[1]) > higher_than: + res += [data] + except Exception as err: + print(err) + return res + else: + return super().filter_data(data_batch, criteria) + + @override + def get_stats(self) -> Dict[str, Union[str, int, float]]: + net_flow = 0 + for data in self.data_batch: + if data.split(":", 1)[0] == "buy": + net_flow += int(data.split(":", 1)[1]) + else: + net_flow -= int(data.split(":", 1)[1]) + res: Dict[str, Union[str, int, float]] = {} + res["nb_process"] = len(self.data_batch) + res["net_flow"] = net_flow + return res + + +class EventStream(DataStream): + def __init__(self, stream_id: str, stream_type: str) -> None: + super().__init__(stream_id, stream_type) + + def process_batch(self, data_batch: List[Any]) -> str: + res: List[str] = [] + for data in data_batch: + try: + if data not in ["login", "logout", "error"]: + continue + res += [data] + except Exception: + continue + self.data_batch += res + return f"{res}" + + @override + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: + return super().filter_data(data_batch, criteria) + + @override + def get_stats(self) -> Dict[str, Union[str, int, float]]: + nb_error = 0 + for data in self.data_batch: + if data == "error": + nb_error += 1 + res: Dict[str, Union[str, int, float]] = {} + res["nb_process"] = len(self.data_batch) + res["nb_error"] = nb_error + return res + + +class StreamProcessor: + def __init__(self) -> None: + self.streams: Dict[ + str, List[Union[SensorStream, TransactionStream, EventStream]] + ] = {"sensor": [], "transaction": [], "event": []} + + def process_batch( + self, + data_batch: List[Any], + stream: Union[SensorStream, TransactionStream, EventStream], + ) -> Union[SensorStream, TransactionStream, EventStream]: + try: + stream.process_batch(data_batch) + if isinstance(stream, SensorStream): + self.streams["sensor"] += [stream] + elif isinstance(stream, TransactionStream): + self.streams["transaction"] += [stream] + elif isinstance(stream, EventStream): + self.streams["event"] += [stream] + except Exception as err: + print(err) + return stream + + def get_stats( + self, stream: Union[SensorStream, TransactionStream, EventStream] + ) -> Dict[str, Union[str, int, float]]: + return stream.get_stats() + + def get_high_priority( + self, + ) -> Dict[str, int]: + res: Dict[str, int] = {} + for stream in self.streams["transaction"]: + try: + res["large transaction"] = len( + stream.filter_data(stream.data_batch, ">100") + ) + except Exception: + print("Error on catching large transaction") + continue + for stream in self.streams["event"]: + res["error detected"] = len( + stream.filter_data(stream.data_batch, "error") + ) + return res + + +def data_stream_tester(): + print("=== CODE NEXUS - POLYMORPHIC STREAM SYSTEM ===\n") + sensor_stream = SensorStream("SENSOR_001", "Environmental Data") + print(f"Stream ID: {sensor_stream.stream_id},\ + Type: {sensor_stream.stream_type}") + print(f"Processing sensor batch:\ + {sensor_stream.process_batch(['temp:22.5', 'humidity:65', 'pressure:1013'])}") + sensor_analysis = sensor_stream.get_stats() + print(f"Sensor analysis: {sensor_analysis['nb_process']} reading\ + process, avg temp: {sensor_analysis['avg_temp']}°C") + print() + transaction_stream = TransactionStream("TRANS_001", "Financial Data") + print(f"Stream ID: {transaction_stream.stream_id},\ + Type: {transaction_stream.stream_type}") + print(f"Processing transaction batch:\ + {transaction_stream.process_batch(['buy:100', 'sell:150', 'buy:75'])}") + transaction_analysis = transaction_stream.get_stats() + print(f"Transaction analysis: {transaction_analysis['nb_process']}\ + operations, net flow: {transaction_analysis['net_flow']} units") + print() + event_stream = EventStream("EVENT_001", "System Events") + print(f"Stream ID: {event_stream.stream_id},\ + Type: {event_stream.stream_type}") + print(f"Processing event batch:\ + {event_stream.process_batch(['login', 'error', 'logout'])}") + event_analysis = event_stream.get_stats() + print(f"Sensor analysis: {event_analysis['nb_process']}\ + events, {event_analysis['nb_error']} error detected") + + +def stream_processor_tester(): + print("=== Polymorphic Stream Processing ===\n") + processor = StreamProcessor() + data_batch = [ + ["temp:22.5", "humidity:65", "pressure:1013"], + ["buy:100", "sell:150"], + ["login", "error", "logout", "login", "error"], + ] + streams = [ + SensorStream("SENSOR_001", "Environmental Data"), + TransactionStream("TRANS_001", "Financial Data"), + EventStream("EVENT_001", "System Events"), + ] + for i in range(0, len(data_batch)): + streams[i] = processor.process_batch(data_batch[i], streams[i]) + print("Batch 1 results:") + for stream in streams: + stat = stream.get_stats() + print(f"- {stream.stream_id}: {stat['nb_process']}") + print("\nHigh priority data:") + hp_data = processor.get_high_priority() + for data in hp_data: + print(f"- {hp_data[data]} {data}") + + +if __name__ == "__main__": + data_stream_tester() + stream_processor_tester()