diff --git a/05/ex0/stream_processor.py b/05/ex0/stream_processor.py index f92502e..ce6d935 100644 --- a/05/ex0/stream_processor.py +++ b/05/ex0/stream_processor.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any +from typing import Any, Union from typing_extensions import override @@ -111,8 +111,8 @@ class LogProcessor(DataProcessor): def class_tester() -> None: print("=== CODE NEXUS - DATA PROCESSOR FOUNDATION ===\n") print("Initializing Numeric Processor...") - data = [1, 2, 3, 4, 5] - processor = NumericProcessor() + data: Union[list[int | str], str] = [1, 2, 3, 4, 5] + processor: DataProcessor = NumericProcessor() res = processor.process(data) is_valid = processor.validate(data) formatted_output = processor.format_output(res) diff --git a/05/ex1/data_stream.py b/05/ex1/data_stream.py index 95b2bcf..4ccdecd 100644 --- a/05/ex1/data_stream.py +++ b/05/ex1/data_stream.py @@ -1,6 +1,4 @@ from abc import ABC, abstractmethod -from re import error -import re from typing import Any, List, Optional, Dict, Union from typing_extensions import override @@ -19,6 +17,8 @@ class DataStream(ABC): self, data_batch: List[Any], criteria: Optional[str] = None ) -> List[Any]: res = [] + if not criteria: + return data_batch for data in data_batch: if data.split(":", 1)[0] in criteria: res += [data] @@ -31,7 +31,9 @@ class DataStream(ABC): class SensorStream(DataStream): - def __init__(self, stream_id: str, stream_type: str) -> None: + def __init__( + self, stream_id: str, stream_type: str = "Environmental Data" + ) -> None: super().__init__(stream_id, stream_type) def process_batch(self, data_batch: List[Any]) -> str: @@ -72,7 +74,9 @@ class SensorStream(DataStream): class TransactionStream(DataStream): - def __init__(self, stream_id: str, stream_type: str) -> None: + def __init__( + self, stream_id: str, stream_type: str = "Financial Data" + ) -> None: super().__init__(stream_id, stream_type) def process_batch(self, data_batch: List[Any]) -> str: @@ -125,7 +129,9 @@ class TransactionStream(DataStream): class EventStream(DataStream): - def __init__(self, stream_id: str, stream_type: str) -> None: + def __init__( + self, stream_id: str, stream_type: str = "System Events" + ) -> None: super().__init__(stream_id, stream_type) def process_batch(self, data_batch: List[Any]) -> str: @@ -160,15 +166,17 @@ class EventStream(DataStream): class StreamProcessor: def __init__(self) -> None: - self.streams: Dict[ - str, List[Union[SensorStream, TransactionStream, EventStream]] - ] = {"sensor": [], "transaction": [], "event": []} + self.streams: Dict[str, List[DataStream]] = { + "sensor": [], + "transaction": [], + "event": [], + } def process_batch( self, data_batch: List[Any], - stream: Union[SensorStream, TransactionStream, EventStream], - ) -> Union[SensorStream, TransactionStream, EventStream]: + stream: DataStream, + ) -> DataStream: try: stream.process_batch(data_batch) if isinstance(stream, SensorStream): @@ -205,7 +213,7 @@ class StreamProcessor: return res -def data_stream_tester(): +def data_stream_tester() -> None: print("=== CODE NEXUS - POLYMORPHIC STREAM SYSTEM ===\n") sensor_stream = SensorStream("SENSOR_001", "Environmental Data") print(f"Stream ID: {sensor_stream.stream_id},\ @@ -235,7 +243,7 @@ def data_stream_tester(): events, {event_analysis['nb_error']} error detected") -def stream_processor_tester(): +def stream_processor_tester() -> None: print("=== Polymorphic Stream Processing ===\n") processor = StreamProcessor() data_batch = [ @@ -243,7 +251,7 @@ def stream_processor_tester(): ["buy:100", "sell:150"], ["login", "error", "logout", "login", "error"], ] - streams = [ + streams: list[DataStream] = [ SensorStream("SENSOR_001", "Environmental Data"), TransactionStream("TRANS_001", "Financial Data"), EventStream("EVENT_001", "System Events"), diff --git a/05/ex2/nexus_pipeline.py b/05/ex2/nexus_pipeline.py index 73ce149..b07537c 100644 --- a/05/ex2/nexus_pipeline.py +++ b/05/ex2/nexus_pipeline.py @@ -1,5 +1,5 @@ from abc import ABC -from typing import Any, List, Protocol, Union +from typing import Any, Dict, List, Protocol, Union from typing_extensions import override @@ -9,18 +9,35 @@ class ProcessingStage(Protocol): class InputStage: - def process(self, data: Any) -> Dict: - pass + def process(self, data: Any) -> Dict[int, float]: + res: Dict[int, float] = {} + i = 0 + for n in data: + try: + if not isinstance(n, float): + n = float(n) + res[i] = n + i += 1 + except ValueError: + continue + return res class TransformStage: - def process(self, data: Any) -> Dict: - pass + def process(self, data: Any) -> Dict[str, float]: + res: Dict[str, Union[int, float]] = {} + res["processed_data"] = len(data) + try: + res["avg"] = sum(data.values()) / len(data) + except ZeroDivisionError: + res["avg"] = 0 + return res class OutputStage: def process(self, data: Any) -> str: - pass + return f"Summary:\n\t\ +- Processed_data: {data['processed_data']}\n\t- avg temp: {data['avg']:.1f}°C" class ProcessingPipeline(ABC): @@ -30,10 +47,12 @@ class ProcessingPipeline(ABC): def add_stage( self, stage: Union[InputStage, TransformStage, OutputStage] ) -> None: - pass + self.stages += [stage] def process(self, data: Any) -> Any: - pass + for stage in self.stages: + data = stage.process(data) + return data class JSONAdapter(ProcessingPipeline): @@ -43,7 +62,10 @@ class JSONAdapter(ProcessingPipeline): @override def process(self, data: Any) -> Any: - return super().process(data) + res: List[Any] = [] + for n in data: + res += [data[n]] + return super().process(res) class CSVAdapter(ProcessingPipeline): @@ -53,7 +75,7 @@ class CSVAdapter(ProcessingPipeline): @override def process(self, data: Any) -> Any: - return super().process(data) + return super().process(data.split(",")) class StreamAdapter(ProcessingPipeline): @@ -63,12 +85,65 @@ class StreamAdapter(ProcessingPipeline): @override def process(self, data: Any) -> Any: + if not isinstance(data, List): + raise Exception return super().process(data) -def tester(): - pass +class NexusManager: + def __init__(self, pipelines: List[ProcessingPipeline]) -> None: + self.pipelines: List[ProcessingPipeline] = pipelines + + def add_pipeline(self, pipeline: ProcessingPipeline) -> None: + self.pipelines += [pipeline] + + def process_data(self, data: Any) -> str: + res: str | None = None + for pipeline in self.pipelines: + pipeline.add_stage(InputStage()) + pipeline.add_stage(TransformStage()) + pipeline.add_stage(OutputStage()) + try: + res = pipeline.process(data) + break + except Exception: + continue + if res is None: + return "[ERROR] Unknown format, incompatible with pipelines" + return res + + +def tester() -> None: + print("=== CODE NEXUS - ENTERPRISE PIPELINE SYSTEM ===\n") + print("Initializing Nexus Manager...") + manager = NexusManager( + [ + JSONAdapter("JSON_01"), + CSVAdapter("CSV_01"), + StreamAdapter("Stream_01"), + ] + ) + print("\n=== test JSONAdapter ===") + data: Any = {"temp": 10, "est": "10.3", "t": 10.6, "p": "Hello"} + print(f"input: {data}") + res = manager.process_data(data) + print(f"Output: {res}\n") + print("\n=== test CSVAdapter ===") + data = "10,20,40,30" + print(f'input: "{data}"') + res = manager.process_data(data) + print(f"Output: {res}\n") + print("\n=== test StreamAdapter ===") + data = [10, 30, 0, "100"] + print(f"input: {data}") + res = manager.process_data(data) + print(f"Output: {res}\n") + print("\n=== test Invalid input ===") + data = 10 + print(f"input: {data}") + res = manager.process_data(data) + print(f"Output: {res}\n") if __name__ == "__main__": - pass + tester()