Files
42-Piscine_Python/05/ex1/data_stream.py
2026-02-11 18:02:15 +01:00

266 lines
9.0 KiB
Python

from abc import ABC, abstractmethod
from re import error
import re
from typing import Any, List, Optional, Dict, Union
from typing_extensions import override
class DataStream(ABC):
def __init__(self, stream_id: str, stream_type: str) -> None:
self.stream_id = stream_id
self.stream_type = stream_type
self.data_batch: List[str] = []
@abstractmethod
def process_batch(self, data_batch: List[Any]) -> str:
pass
def filter_data(
self, data_batch: List[Any], criteria: Optional[str] = None
) -> List[Any]:
res = []
for data in data_batch:
if data.split(":", 1)[0] in criteria:
res += [data]
return res
def get_stats(self) -> Dict[str, Union[str, int, float]]:
return {
"nb_process": len(self.data_batch),
}
class SensorStream(DataStream):
def __init__(self, stream_id: str, stream_type: str) -> None:
super().__init__(stream_id, stream_type)
def process_batch(self, data_batch: List[Any]) -> str:
res: List[str] = []
for data in data_batch:
try:
if len(data.split(":", 1)) == 2:
if data.split(":", 1)[0] == "temp":
float(data.split(":", 1)[1])
elif (
data.split(":", 1)[0] == "humidity"
or data.split(":", 1)[0] == "pressure"
):
int(data.split(":", 1)[1])
else:
continue
res += [data]
except Exception:
continue
self.data_batch += res
return f"{res}"
@override
def filter_data(
self, data_batch: List[Any], criteria: Optional[str] = None
) -> List[Any]:
return super().filter_data(data_batch, criteria)
@override
def get_stats(self) -> Dict[str, Union[str, int, float]]:
temp_data: List[float] = []
for temp in self.filter_data(self.data_batch, "temp"):
temp_data += [float(temp.split(":", 1)[1])]
res: Dict[str, Union[str, int, float]] = {}
res["nb_process"] = len(self.data_batch)
res["avg_temp"] = sum(temp_data) / len(temp_data)
return res
class TransactionStream(DataStream):
def __init__(self, stream_id: str, stream_type: str) -> None:
super().__init__(stream_id, stream_type)
def process_batch(self, data_batch: List[Any]) -> str:
res: List[str] = []
for data in data_batch:
try:
if len(data.split(":", 1)) == 2:
if (
data.split(":", 1)[0] == "buy"
or data.split(":", 1)[0] == "sell"
):
int(data.split(":", 1)[1])
else:
continue
res += [data]
except Exception:
continue
self.data_batch += res
return f"{res}"
@override
def filter_data(
self, data_batch: List[Any], criteria: Optional[str] = None
) -> List[Any]:
res: List[Any] = []
if criteria and ">" == criteria[0]:
try:
higher_than = int(criteria.split(">")[1])
for data in data_batch:
if int(data.split(":", 1)[1]) > higher_than:
res += [data]
except Exception as err:
print(err)
return res
else:
return super().filter_data(data_batch, criteria)
@override
def get_stats(self) -> Dict[str, Union[str, int, float]]:
net_flow = 0
for data in self.data_batch:
if data.split(":", 1)[0] == "buy":
net_flow += int(data.split(":", 1)[1])
else:
net_flow -= int(data.split(":", 1)[1])
res: Dict[str, Union[str, int, float]] = {}
res["nb_process"] = len(self.data_batch)
res["net_flow"] = net_flow
return res
class EventStream(DataStream):
def __init__(self, stream_id: str, stream_type: str) -> None:
super().__init__(stream_id, stream_type)
def process_batch(self, data_batch: List[Any]) -> str:
res: List[str] = []
for data in data_batch:
try:
if data not in ["login", "logout", "error"]:
continue
res += [data]
except Exception:
continue
self.data_batch += res
return f"{res}"
@override
def filter_data(
self, data_batch: List[Any], criteria: Optional[str] = None
) -> List[Any]:
return super().filter_data(data_batch, criteria)
@override
def get_stats(self) -> Dict[str, Union[str, int, float]]:
nb_error = 0
for data in self.data_batch:
if data == "error":
nb_error += 1
res: Dict[str, Union[str, int, float]] = {}
res["nb_process"] = len(self.data_batch)
res["nb_error"] = nb_error
return res
class StreamProcessor:
def __init__(self) -> None:
self.streams: Dict[
str, List[Union[SensorStream, TransactionStream, EventStream]]
] = {"sensor": [], "transaction": [], "event": []}
def process_batch(
self,
data_batch: List[Any],
stream: Union[SensorStream, TransactionStream, EventStream],
) -> Union[SensorStream, TransactionStream, EventStream]:
try:
stream.process_batch(data_batch)
if isinstance(stream, SensorStream):
self.streams["sensor"] += [stream]
elif isinstance(stream, TransactionStream):
self.streams["transaction"] += [stream]
elif isinstance(stream, EventStream):
self.streams["event"] += [stream]
except Exception as err:
print(err)
return stream
def get_stats(
self, stream: Union[SensorStream, TransactionStream, EventStream]
) -> Dict[str, Union[str, int, float]]:
return stream.get_stats()
def get_high_priority(
self,
) -> Dict[str, int]:
res: Dict[str, int] = {}
for stream in self.streams["transaction"]:
try:
res["large transaction"] = len(
stream.filter_data(stream.data_batch, ">100")
)
except Exception:
print("Error on catching large transaction")
continue
for stream in self.streams["event"]:
res["error detected"] = len(
stream.filter_data(stream.data_batch, "error")
)
return res
def data_stream_tester():
print("=== CODE NEXUS - POLYMORPHIC STREAM SYSTEM ===\n")
sensor_stream = SensorStream("SENSOR_001", "Environmental Data")
print(f"Stream ID: {sensor_stream.stream_id},\
Type: {sensor_stream.stream_type}")
print(f"Processing sensor batch:\
{sensor_stream.process_batch(['temp:22.5', 'humidity:65', 'pressure:1013'])}")
sensor_analysis = sensor_stream.get_stats()
print(f"Sensor analysis: {sensor_analysis['nb_process']} reading\
process, avg temp: {sensor_analysis['avg_temp']}°C")
print()
transaction_stream = TransactionStream("TRANS_001", "Financial Data")
print(f"Stream ID: {transaction_stream.stream_id},\
Type: {transaction_stream.stream_type}")
print(f"Processing transaction batch:\
{transaction_stream.process_batch(['buy:100', 'sell:150', 'buy:75'])}")
transaction_analysis = transaction_stream.get_stats()
print(f"Transaction analysis: {transaction_analysis['nb_process']}\
operations, net flow: {transaction_analysis['net_flow']} units")
print()
event_stream = EventStream("EVENT_001", "System Events")
print(f"Stream ID: {event_stream.stream_id},\
Type: {event_stream.stream_type}")
print(f"Processing event batch:\
{event_stream.process_batch(['login', 'error', 'logout'])}")
event_analysis = event_stream.get_stats()
print(f"Sensor analysis: {event_analysis['nb_process']}\
events, {event_analysis['nb_error']} error detected")
def stream_processor_tester():
print("=== Polymorphic Stream Processing ===\n")
processor = StreamProcessor()
data_batch = [
["temp:22.5", "humidity:65", "pressure:1013"],
["buy:100", "sell:150"],
["login", "error", "logout", "login", "error"],
]
streams = [
SensorStream("SENSOR_001", "Environmental Data"),
TransactionStream("TRANS_001", "Financial Data"),
EventStream("EVENT_001", "System Events"),
]
for i in range(0, len(data_batch)):
streams[i] = processor.process_batch(data_batch[i], streams[i])
print("Batch 1 results:")
for stream in streams:
stat = stream.get_stats()
print(f"- {stream.stream_id}: {stat['nb_process']}")
print("\nHigh priority data:")
hp_data = processor.get_high_priority()
for data in hp_data:
print(f"- {hp_data[data]} {data}")
if __name__ == "__main__":
data_stream_tester()
stream_processor_tester()