mirror of
https://github.com/DavidGailleton/42-Piscine_Python.git
synced 2026-03-13 20:56:54 +01:00
Compare commits
2 Commits
2c33879342
...
05a3ddc8b6
| Author | SHA1 | Date | |
|---|---|---|---|
| 05a3ddc8b6 | |||
| c16e910c14 |
160
05/ex0/stream_processor.py
Normal file
160
05/ex0/stream_processor.py
Normal file
@@ -0,0 +1,160 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
from typing_extensions import override
|
||||
|
||||
|
||||
class DataProcessor(ABC):
|
||||
@abstractmethod
|
||||
def process(self, data: Any) -> str:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def validate(self, data: Any) -> bool:
|
||||
pass
|
||||
|
||||
def format_output(self, result: str) -> str:
|
||||
return f"Processed output: {result}"
|
||||
|
||||
|
||||
class NumericProcessor(DataProcessor):
|
||||
def process(self, data: Any) -> str:
|
||||
try:
|
||||
return f"{data}"
|
||||
except Exception as err:
|
||||
print(err)
|
||||
return "error"
|
||||
|
||||
def validate(self, data: Any) -> bool:
|
||||
try:
|
||||
for n in data:
|
||||
if type(n).__name__ != "int":
|
||||
return False
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@override
|
||||
def format_output(self, result: str) -> str:
|
||||
try:
|
||||
to_processed = result.split(", ")
|
||||
res_int: list[int] = []
|
||||
for n in to_processed:
|
||||
n = n.replace("[", "").replace("]", "")
|
||||
if n.isnumeric():
|
||||
res_int = res_int + [int(n)]
|
||||
avg = sum(res_int) / len(res_int)
|
||||
return f"Processed {len(res_int)} numeric value,\
|
||||
sum={sum(res_int)}, avg={avg:.2f}"
|
||||
except Exception as err:
|
||||
print(f"NumericProcessor / format_output: {err}")
|
||||
return "error"
|
||||
|
||||
|
||||
class TextProcessor(DataProcessor):
|
||||
def process(self, data: Any) -> str:
|
||||
return str(data)
|
||||
|
||||
def validate(self, data: Any) -> bool:
|
||||
try:
|
||||
for char in data:
|
||||
if type(char).__name__ != "str":
|
||||
return False
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@override
|
||||
def format_output(self, result: str) -> str:
|
||||
return f"Processed text: {len(result)}\
|
||||
characters, {len(result.split(' '))} words"
|
||||
|
||||
|
||||
class LogProcessor(DataProcessor):
|
||||
def process(self, data: Any) -> str:
|
||||
try:
|
||||
return f"{data[0]}: {data[1]}"
|
||||
except Exception as err:
|
||||
print(err)
|
||||
return "error"
|
||||
|
||||
def validate(self, data: Any) -> bool:
|
||||
try:
|
||||
if len(data) == 2:
|
||||
for n in data:
|
||||
if type(n).__name__ != "str":
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@override
|
||||
def format_output(self, result: str) -> str:
|
||||
try:
|
||||
log_res = result.split(":")[0]
|
||||
if log_res == "ERROR":
|
||||
header = "[ALERT]"
|
||||
elif log_res == "INFO":
|
||||
header = "[INFO]"
|
||||
elif log_res == "SUCCESS":
|
||||
header = "[SUCCESS]"
|
||||
else:
|
||||
header = "[UNKNOWN]"
|
||||
return (
|
||||
f"{header} {result.split(':', 1)[0]}{result.split(':', 1)[1]}"
|
||||
)
|
||||
except Exception as err:
|
||||
print(err)
|
||||
return "error"
|
||||
|
||||
|
||||
def class_tester() -> None:
|
||||
print("=== CODE NEXUS - DATA PROCESSOR FOUNDATION ===\n")
|
||||
print("Initializing Numeric Processor...")
|
||||
data = [1, 2, 3, 4, 5]
|
||||
processor = NumericProcessor()
|
||||
res = processor.process(data)
|
||||
is_valid = processor.validate(data)
|
||||
formatted_output = processor.format_output(res)
|
||||
print(f"processing data: {res}")
|
||||
if is_valid:
|
||||
print("Validation: Numeric data verified")
|
||||
else:
|
||||
print("Validation: Invalid numeric data")
|
||||
print(f"Output: {formatted_output}")
|
||||
print("\nInitializing Text Processor...")
|
||||
data = "Hello Nexus World"
|
||||
processor = TextProcessor()
|
||||
res = processor.process(data)
|
||||
is_valid = processor.validate(data)
|
||||
formatted_output = processor.format_output(res)
|
||||
print(f"processing data: {res}")
|
||||
if is_valid:
|
||||
print("Validation: Text data verified")
|
||||
else:
|
||||
print("Validation: Invalid text data")
|
||||
print(f"Output: {formatted_output}")
|
||||
print("\nInitializing Log Processor...")
|
||||
data = ["ERROR", "Connection timeout"]
|
||||
processor = LogProcessor()
|
||||
res = processor.process(data)
|
||||
is_valid = processor.validate(data)
|
||||
formatted_output = processor.format_output(res)
|
||||
print(f"processing data: {res}")
|
||||
if is_valid:
|
||||
print("Validation: Log data verified")
|
||||
else:
|
||||
print("Validation: Invalid log data")
|
||||
print(f"Output: {formatted_output}")
|
||||
print("\n=== Polymorphic Processing Demo ===\n")
|
||||
print(f"Result 1:\
|
||||
{NumericProcessor().format_output('[1, 2, 3]')}")
|
||||
print(f"Result 2:\
|
||||
{TextProcessor().format_output('Hello World !')}")
|
||||
print(f"Result 3:\
|
||||
{LogProcessor().format_output('INFO: level detected: System ready')}")
|
||||
print("\nFoundation systems online. Nexus ready for advanced streams.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
class_tester()
|
||||
265
05/ex1/data_stream.py
Normal file
265
05/ex1/data_stream.py
Normal file
@@ -0,0 +1,265 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from re import error
|
||||
import re
|
||||
from typing import Any, List, Optional, Dict, Union
|
||||
from typing_extensions import override
|
||||
|
||||
|
||||
class DataStream(ABC):
|
||||
def __init__(self, stream_id: str, stream_type: str) -> None:
|
||||
self.stream_id = stream_id
|
||||
self.stream_type = stream_type
|
||||
self.data_batch: List[str] = []
|
||||
|
||||
@abstractmethod
|
||||
def process_batch(self, data_batch: List[Any]) -> str:
|
||||
pass
|
||||
|
||||
def filter_data(
|
||||
self, data_batch: List[Any], criteria: Optional[str] = None
|
||||
) -> List[Any]:
|
||||
res = []
|
||||
for data in data_batch:
|
||||
if data.split(":", 1)[0] in criteria:
|
||||
res += [data]
|
||||
return res
|
||||
|
||||
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
||||
return {
|
||||
"nb_process": len(self.data_batch),
|
||||
}
|
||||
|
||||
|
||||
class SensorStream(DataStream):
|
||||
def __init__(self, stream_id: str, stream_type: str) -> None:
|
||||
super().__init__(stream_id, stream_type)
|
||||
|
||||
def process_batch(self, data_batch: List[Any]) -> str:
|
||||
res: List[str] = []
|
||||
for data in data_batch:
|
||||
try:
|
||||
if len(data.split(":", 1)) == 2:
|
||||
if data.split(":", 1)[0] == "temp":
|
||||
float(data.split(":", 1)[1])
|
||||
elif (
|
||||
data.split(":", 1)[0] == "humidity"
|
||||
or data.split(":", 1)[0] == "pressure"
|
||||
):
|
||||
int(data.split(":", 1)[1])
|
||||
else:
|
||||
continue
|
||||
res += [data]
|
||||
except Exception:
|
||||
continue
|
||||
self.data_batch += res
|
||||
return f"{res}"
|
||||
|
||||
@override
|
||||
def filter_data(
|
||||
self, data_batch: List[Any], criteria: Optional[str] = None
|
||||
) -> List[Any]:
|
||||
return super().filter_data(data_batch, criteria)
|
||||
|
||||
@override
|
||||
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
||||
temp_data: List[float] = []
|
||||
for temp in self.filter_data(self.data_batch, "temp"):
|
||||
temp_data += [float(temp.split(":", 1)[1])]
|
||||
res: Dict[str, Union[str, int, float]] = {}
|
||||
res["nb_process"] = len(self.data_batch)
|
||||
res["avg_temp"] = sum(temp_data) / len(temp_data)
|
||||
return res
|
||||
|
||||
|
||||
class TransactionStream(DataStream):
|
||||
def __init__(self, stream_id: str, stream_type: str) -> None:
|
||||
super().__init__(stream_id, stream_type)
|
||||
|
||||
def process_batch(self, data_batch: List[Any]) -> str:
|
||||
res: List[str] = []
|
||||
for data in data_batch:
|
||||
try:
|
||||
if len(data.split(":", 1)) == 2:
|
||||
if (
|
||||
data.split(":", 1)[0] == "buy"
|
||||
or data.split(":", 1)[0] == "sell"
|
||||
):
|
||||
int(data.split(":", 1)[1])
|
||||
else:
|
||||
continue
|
||||
res += [data]
|
||||
except Exception:
|
||||
continue
|
||||
self.data_batch += res
|
||||
return f"{res}"
|
||||
|
||||
@override
|
||||
def filter_data(
|
||||
self, data_batch: List[Any], criteria: Optional[str] = None
|
||||
) -> List[Any]:
|
||||
res: List[Any] = []
|
||||
if criteria and ">" == criteria[0]:
|
||||
try:
|
||||
higher_than = int(criteria.split(">")[1])
|
||||
for data in data_batch:
|
||||
if int(data.split(":", 1)[1]) > higher_than:
|
||||
res += [data]
|
||||
except Exception as err:
|
||||
print(err)
|
||||
return res
|
||||
else:
|
||||
return super().filter_data(data_batch, criteria)
|
||||
|
||||
@override
|
||||
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
||||
net_flow = 0
|
||||
for data in self.data_batch:
|
||||
if data.split(":", 1)[0] == "buy":
|
||||
net_flow += int(data.split(":", 1)[1])
|
||||
else:
|
||||
net_flow -= int(data.split(":", 1)[1])
|
||||
res: Dict[str, Union[str, int, float]] = {}
|
||||
res["nb_process"] = len(self.data_batch)
|
||||
res["net_flow"] = net_flow
|
||||
return res
|
||||
|
||||
|
||||
class EventStream(DataStream):
|
||||
def __init__(self, stream_id: str, stream_type: str) -> None:
|
||||
super().__init__(stream_id, stream_type)
|
||||
|
||||
def process_batch(self, data_batch: List[Any]) -> str:
|
||||
res: List[str] = []
|
||||
for data in data_batch:
|
||||
try:
|
||||
if data not in ["login", "logout", "error"]:
|
||||
continue
|
||||
res += [data]
|
||||
except Exception:
|
||||
continue
|
||||
self.data_batch += res
|
||||
return f"{res}"
|
||||
|
||||
@override
|
||||
def filter_data(
|
||||
self, data_batch: List[Any], criteria: Optional[str] = None
|
||||
) -> List[Any]:
|
||||
return super().filter_data(data_batch, criteria)
|
||||
|
||||
@override
|
||||
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
||||
nb_error = 0
|
||||
for data in self.data_batch:
|
||||
if data == "error":
|
||||
nb_error += 1
|
||||
res: Dict[str, Union[str, int, float]] = {}
|
||||
res["nb_process"] = len(self.data_batch)
|
||||
res["nb_error"] = nb_error
|
||||
return res
|
||||
|
||||
|
||||
class StreamProcessor:
|
||||
def __init__(self) -> None:
|
||||
self.streams: Dict[
|
||||
str, List[Union[SensorStream, TransactionStream, EventStream]]
|
||||
] = {"sensor": [], "transaction": [], "event": []}
|
||||
|
||||
def process_batch(
|
||||
self,
|
||||
data_batch: List[Any],
|
||||
stream: Union[SensorStream, TransactionStream, EventStream],
|
||||
) -> Union[SensorStream, TransactionStream, EventStream]:
|
||||
try:
|
||||
stream.process_batch(data_batch)
|
||||
if isinstance(stream, SensorStream):
|
||||
self.streams["sensor"] += [stream]
|
||||
elif isinstance(stream, TransactionStream):
|
||||
self.streams["transaction"] += [stream]
|
||||
elif isinstance(stream, EventStream):
|
||||
self.streams["event"] += [stream]
|
||||
except Exception as err:
|
||||
print(err)
|
||||
return stream
|
||||
|
||||
def get_stats(
|
||||
self, stream: Union[SensorStream, TransactionStream, EventStream]
|
||||
) -> Dict[str, Union[str, int, float]]:
|
||||
return stream.get_stats()
|
||||
|
||||
def get_high_priority(
|
||||
self,
|
||||
) -> Dict[str, int]:
|
||||
res: Dict[str, int] = {}
|
||||
for stream in self.streams["transaction"]:
|
||||
try:
|
||||
res["large transaction"] = len(
|
||||
stream.filter_data(stream.data_batch, ">100")
|
||||
)
|
||||
except Exception:
|
||||
print("Error on catching large transaction")
|
||||
continue
|
||||
for stream in self.streams["event"]:
|
||||
res["error detected"] = len(
|
||||
stream.filter_data(stream.data_batch, "error")
|
||||
)
|
||||
return res
|
||||
|
||||
|
||||
def data_stream_tester():
|
||||
print("=== CODE NEXUS - POLYMORPHIC STREAM SYSTEM ===\n")
|
||||
sensor_stream = SensorStream("SENSOR_001", "Environmental Data")
|
||||
print(f"Stream ID: {sensor_stream.stream_id},\
|
||||
Type: {sensor_stream.stream_type}")
|
||||
print(f"Processing sensor batch:\
|
||||
{sensor_stream.process_batch(['temp:22.5', 'humidity:65', 'pressure:1013'])}")
|
||||
sensor_analysis = sensor_stream.get_stats()
|
||||
print(f"Sensor analysis: {sensor_analysis['nb_process']} reading\
|
||||
process, avg temp: {sensor_analysis['avg_temp']}°C")
|
||||
print()
|
||||
transaction_stream = TransactionStream("TRANS_001", "Financial Data")
|
||||
print(f"Stream ID: {transaction_stream.stream_id},\
|
||||
Type: {transaction_stream.stream_type}")
|
||||
print(f"Processing transaction batch:\
|
||||
{transaction_stream.process_batch(['buy:100', 'sell:150', 'buy:75'])}")
|
||||
transaction_analysis = transaction_stream.get_stats()
|
||||
print(f"Transaction analysis: {transaction_analysis['nb_process']}\
|
||||
operations, net flow: {transaction_analysis['net_flow']} units")
|
||||
print()
|
||||
event_stream = EventStream("EVENT_001", "System Events")
|
||||
print(f"Stream ID: {event_stream.stream_id},\
|
||||
Type: {event_stream.stream_type}")
|
||||
print(f"Processing event batch:\
|
||||
{event_stream.process_batch(['login', 'error', 'logout'])}")
|
||||
event_analysis = event_stream.get_stats()
|
||||
print(f"Sensor analysis: {event_analysis['nb_process']}\
|
||||
events, {event_analysis['nb_error']} error detected")
|
||||
|
||||
|
||||
def stream_processor_tester():
|
||||
print("=== Polymorphic Stream Processing ===\n")
|
||||
processor = StreamProcessor()
|
||||
data_batch = [
|
||||
["temp:22.5", "humidity:65", "pressure:1013"],
|
||||
["buy:100", "sell:150"],
|
||||
["login", "error", "logout", "login", "error"],
|
||||
]
|
||||
streams = [
|
||||
SensorStream("SENSOR_001", "Environmental Data"),
|
||||
TransactionStream("TRANS_001", "Financial Data"),
|
||||
EventStream("EVENT_001", "System Events"),
|
||||
]
|
||||
for i in range(0, len(data_batch)):
|
||||
streams[i] = processor.process_batch(data_batch[i], streams[i])
|
||||
print("Batch 1 results:")
|
||||
for stream in streams:
|
||||
stat = stream.get_stats()
|
||||
print(f"- {stream.stream_id}: {stat['nb_process']}")
|
||||
print("\nHigh priority data:")
|
||||
hp_data = processor.get_high_priority()
|
||||
for data in hp_data:
|
||||
print(f"- {hp_data[data]} {data}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
data_stream_tester()
|
||||
stream_processor_tester()
|
||||
74
05/ex2/nexus_pipeline.py
Normal file
74
05/ex2/nexus_pipeline.py
Normal file
@@ -0,0 +1,74 @@
|
||||
from abc import ABC
|
||||
from typing import Any, List, Protocol, Union
|
||||
from typing_extensions import override
|
||||
|
||||
|
||||
class ProcessingStage(Protocol):
|
||||
def process(self, data: Any) -> Any:
|
||||
pass
|
||||
|
||||
|
||||
class InputStage:
|
||||
def process(self, data: Any) -> Dict:
|
||||
pass
|
||||
|
||||
|
||||
class TransformStage:
|
||||
def process(self, data: Any) -> Dict:
|
||||
pass
|
||||
|
||||
|
||||
class OutputStage:
|
||||
def process(self, data: Any) -> str:
|
||||
pass
|
||||
|
||||
|
||||
class ProcessingPipeline(ABC):
|
||||
def __init__(self) -> None:
|
||||
self.stages: List[InputStage | TransformStage | OutputStage] = []
|
||||
|
||||
def add_stage(
|
||||
self, stage: Union[InputStage, TransformStage, OutputStage]
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
pass
|
||||
|
||||
|
||||
class JSONAdapter(ProcessingPipeline):
|
||||
def __init__(self, pipeline_id: str) -> None:
|
||||
self.pipeline_id = pipeline_id
|
||||
super().__init__()
|
||||
|
||||
@override
|
||||
def process(self, data: Any) -> Any:
|
||||
return super().process(data)
|
||||
|
||||
|
||||
class CSVAdapter(ProcessingPipeline):
|
||||
def __init__(self, pipeline_id: str) -> None:
|
||||
self.pipeline_id = pipeline_id
|
||||
super().__init__()
|
||||
|
||||
@override
|
||||
def process(self, data: Any) -> Any:
|
||||
return super().process(data)
|
||||
|
||||
|
||||
class StreamAdapter(ProcessingPipeline):
|
||||
def __init__(self, pipeline_id: str) -> None:
|
||||
self.pipeline_id = pipeline_id
|
||||
super().__init__()
|
||||
|
||||
@override
|
||||
def process(self, data: Any) -> Any:
|
||||
return super().process(data)
|
||||
|
||||
|
||||
def tester():
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
||||
Reference in New Issue
Block a user