mirror of
https://github.com/DavidGailleton/42-Piscine_Python.git
synced 2026-03-14 05:06:55 +01:00
266 lines
9.0 KiB
Python
266 lines
9.0 KiB
Python
from abc import ABC, abstractmethod
|
|
from re import error
|
|
import re
|
|
from typing import Any, List, Optional, Dict, Union
|
|
from typing_extensions import override
|
|
|
|
|
|
class DataStream(ABC):
|
|
def __init__(self, stream_id: str, stream_type: str) -> None:
|
|
self.stream_id = stream_id
|
|
self.stream_type = stream_type
|
|
self.data_batch: List[str] = []
|
|
|
|
@abstractmethod
|
|
def process_batch(self, data_batch: List[Any]) -> str:
|
|
pass
|
|
|
|
def filter_data(
|
|
self, data_batch: List[Any], criteria: Optional[str] = None
|
|
) -> List[Any]:
|
|
res = []
|
|
for data in data_batch:
|
|
if data.split(":", 1)[0] in criteria:
|
|
res += [data]
|
|
return res
|
|
|
|
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
|
return {
|
|
"nb_process": len(self.data_batch),
|
|
}
|
|
|
|
|
|
class SensorStream(DataStream):
|
|
def __init__(self, stream_id: str, stream_type: str) -> None:
|
|
super().__init__(stream_id, stream_type)
|
|
|
|
def process_batch(self, data_batch: List[Any]) -> str:
|
|
res: List[str] = []
|
|
for data in data_batch:
|
|
try:
|
|
if len(data.split(":", 1)) == 2:
|
|
if data.split(":", 1)[0] == "temp":
|
|
float(data.split(":", 1)[1])
|
|
elif (
|
|
data.split(":", 1)[0] == "humidity"
|
|
or data.split(":", 1)[0] == "pressure"
|
|
):
|
|
int(data.split(":", 1)[1])
|
|
else:
|
|
continue
|
|
res += [data]
|
|
except Exception:
|
|
continue
|
|
self.data_batch += res
|
|
return f"{res}"
|
|
|
|
@override
|
|
def filter_data(
|
|
self, data_batch: List[Any], criteria: Optional[str] = None
|
|
) -> List[Any]:
|
|
return super().filter_data(data_batch, criteria)
|
|
|
|
@override
|
|
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
|
temp_data: List[float] = []
|
|
for temp in self.filter_data(self.data_batch, "temp"):
|
|
temp_data += [float(temp.split(":", 1)[1])]
|
|
res: Dict[str, Union[str, int, float]] = {}
|
|
res["nb_process"] = len(self.data_batch)
|
|
res["avg_temp"] = sum(temp_data) / len(temp_data)
|
|
return res
|
|
|
|
|
|
class TransactionStream(DataStream):
|
|
def __init__(self, stream_id: str, stream_type: str) -> None:
|
|
super().__init__(stream_id, stream_type)
|
|
|
|
def process_batch(self, data_batch: List[Any]) -> str:
|
|
res: List[str] = []
|
|
for data in data_batch:
|
|
try:
|
|
if len(data.split(":", 1)) == 2:
|
|
if (
|
|
data.split(":", 1)[0] == "buy"
|
|
or data.split(":", 1)[0] == "sell"
|
|
):
|
|
int(data.split(":", 1)[1])
|
|
else:
|
|
continue
|
|
res += [data]
|
|
except Exception:
|
|
continue
|
|
self.data_batch += res
|
|
return f"{res}"
|
|
|
|
@override
|
|
def filter_data(
|
|
self, data_batch: List[Any], criteria: Optional[str] = None
|
|
) -> List[Any]:
|
|
res: List[Any] = []
|
|
if criteria and ">" == criteria[0]:
|
|
try:
|
|
higher_than = int(criteria.split(">")[1])
|
|
for data in data_batch:
|
|
if int(data.split(":", 1)[1]) > higher_than:
|
|
res += [data]
|
|
except Exception as err:
|
|
print(err)
|
|
return res
|
|
else:
|
|
return super().filter_data(data_batch, criteria)
|
|
|
|
@override
|
|
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
|
net_flow = 0
|
|
for data in self.data_batch:
|
|
if data.split(":", 1)[0] == "buy":
|
|
net_flow += int(data.split(":", 1)[1])
|
|
else:
|
|
net_flow -= int(data.split(":", 1)[1])
|
|
res: Dict[str, Union[str, int, float]] = {}
|
|
res["nb_process"] = len(self.data_batch)
|
|
res["net_flow"] = net_flow
|
|
return res
|
|
|
|
|
|
class EventStream(DataStream):
|
|
def __init__(self, stream_id: str, stream_type: str) -> None:
|
|
super().__init__(stream_id, stream_type)
|
|
|
|
def process_batch(self, data_batch: List[Any]) -> str:
|
|
res: List[str] = []
|
|
for data in data_batch:
|
|
try:
|
|
if data not in ["login", "logout", "error"]:
|
|
continue
|
|
res += [data]
|
|
except Exception:
|
|
continue
|
|
self.data_batch += res
|
|
return f"{res}"
|
|
|
|
@override
|
|
def filter_data(
|
|
self, data_batch: List[Any], criteria: Optional[str] = None
|
|
) -> List[Any]:
|
|
return super().filter_data(data_batch, criteria)
|
|
|
|
@override
|
|
def get_stats(self) -> Dict[str, Union[str, int, float]]:
|
|
nb_error = 0
|
|
for data in self.data_batch:
|
|
if data == "error":
|
|
nb_error += 1
|
|
res: Dict[str, Union[str, int, float]] = {}
|
|
res["nb_process"] = len(self.data_batch)
|
|
res["nb_error"] = nb_error
|
|
return res
|
|
|
|
|
|
class StreamProcessor:
|
|
def __init__(self) -> None:
|
|
self.streams: Dict[
|
|
str, List[Union[SensorStream, TransactionStream, EventStream]]
|
|
] = {"sensor": [], "transaction": [], "event": []}
|
|
|
|
def process_batch(
|
|
self,
|
|
data_batch: List[Any],
|
|
stream: Union[SensorStream, TransactionStream, EventStream],
|
|
) -> Union[SensorStream, TransactionStream, EventStream]:
|
|
try:
|
|
stream.process_batch(data_batch)
|
|
if isinstance(stream, SensorStream):
|
|
self.streams["sensor"] += [stream]
|
|
elif isinstance(stream, TransactionStream):
|
|
self.streams["transaction"] += [stream]
|
|
elif isinstance(stream, EventStream):
|
|
self.streams["event"] += [stream]
|
|
except Exception as err:
|
|
print(err)
|
|
return stream
|
|
|
|
def get_stats(
|
|
self, stream: Union[SensorStream, TransactionStream, EventStream]
|
|
) -> Dict[str, Union[str, int, float]]:
|
|
return stream.get_stats()
|
|
|
|
def get_high_priority(
|
|
self,
|
|
) -> Dict[str, int]:
|
|
res: Dict[str, int] = {}
|
|
for stream in self.streams["transaction"]:
|
|
try:
|
|
res["large transaction"] = len(
|
|
stream.filter_data(stream.data_batch, ">100")
|
|
)
|
|
except Exception:
|
|
print("Error on catching large transaction")
|
|
continue
|
|
for stream in self.streams["event"]:
|
|
res["error detected"] = len(
|
|
stream.filter_data(stream.data_batch, "error")
|
|
)
|
|
return res
|
|
|
|
|
|
def data_stream_tester():
|
|
print("=== CODE NEXUS - POLYMORPHIC STREAM SYSTEM ===\n")
|
|
sensor_stream = SensorStream("SENSOR_001", "Environmental Data")
|
|
print(f"Stream ID: {sensor_stream.stream_id},\
|
|
Type: {sensor_stream.stream_type}")
|
|
print(f"Processing sensor batch:\
|
|
{sensor_stream.process_batch(['temp:22.5', 'humidity:65', 'pressure:1013'])}")
|
|
sensor_analysis = sensor_stream.get_stats()
|
|
print(f"Sensor analysis: {sensor_analysis['nb_process']} reading\
|
|
process, avg temp: {sensor_analysis['avg_temp']}°C")
|
|
print()
|
|
transaction_stream = TransactionStream("TRANS_001", "Financial Data")
|
|
print(f"Stream ID: {transaction_stream.stream_id},\
|
|
Type: {transaction_stream.stream_type}")
|
|
print(f"Processing transaction batch:\
|
|
{transaction_stream.process_batch(['buy:100', 'sell:150', 'buy:75'])}")
|
|
transaction_analysis = transaction_stream.get_stats()
|
|
print(f"Transaction analysis: {transaction_analysis['nb_process']}\
|
|
operations, net flow: {transaction_analysis['net_flow']} units")
|
|
print()
|
|
event_stream = EventStream("EVENT_001", "System Events")
|
|
print(f"Stream ID: {event_stream.stream_id},\
|
|
Type: {event_stream.stream_type}")
|
|
print(f"Processing event batch:\
|
|
{event_stream.process_batch(['login', 'error', 'logout'])}")
|
|
event_analysis = event_stream.get_stats()
|
|
print(f"Sensor analysis: {event_analysis['nb_process']}\
|
|
events, {event_analysis['nb_error']} error detected")
|
|
|
|
|
|
def stream_processor_tester():
|
|
print("=== Polymorphic Stream Processing ===\n")
|
|
processor = StreamProcessor()
|
|
data_batch = [
|
|
["temp:22.5", "humidity:65", "pressure:1013"],
|
|
["buy:100", "sell:150"],
|
|
["login", "error", "logout", "login", "error"],
|
|
]
|
|
streams = [
|
|
SensorStream("SENSOR_001", "Environmental Data"),
|
|
TransactionStream("TRANS_001", "Financial Data"),
|
|
EventStream("EVENT_001", "System Events"),
|
|
]
|
|
for i in range(0, len(data_batch)):
|
|
streams[i] = processor.process_batch(data_batch[i], streams[i])
|
|
print("Batch 1 results:")
|
|
for stream in streams:
|
|
stat = stream.get_stats()
|
|
print(f"- {stream.stream_id}: {stat['nb_process']}")
|
|
print("\nHigh priority data:")
|
|
hp_data = processor.get_high_priority()
|
|
for data in hp_data:
|
|
print(f"- {hp_data[data]} {data}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
data_stream_tester()
|
|
stream_processor_tester()
|