mirror of
https://github.com/DavidGailleton/42-Piscine_Python.git
synced 2026-03-14 05:06:55 +01:00
Module 05 finish + mypy strict + flake
This commit is contained in:
@@ -1,6 +1,4 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from re import error
|
||||
import re
|
||||
from typing import Any, List, Optional, Dict, Union
|
||||
from typing_extensions import override
|
||||
|
||||
@@ -19,6 +17,8 @@ class DataStream(ABC):
|
||||
self, data_batch: List[Any], criteria: Optional[str] = None
|
||||
) -> List[Any]:
|
||||
res = []
|
||||
if not criteria:
|
||||
return data_batch
|
||||
for data in data_batch:
|
||||
if data.split(":", 1)[0] in criteria:
|
||||
res += [data]
|
||||
@@ -31,7 +31,9 @@ class DataStream(ABC):
|
||||
|
||||
|
||||
class SensorStream(DataStream):
|
||||
def __init__(self, stream_id: str, stream_type: str) -> None:
|
||||
def __init__(
|
||||
self, stream_id: str, stream_type: str = "Environmental Data"
|
||||
) -> None:
|
||||
super().__init__(stream_id, stream_type)
|
||||
|
||||
def process_batch(self, data_batch: List[Any]) -> str:
|
||||
@@ -72,7 +74,9 @@ class SensorStream(DataStream):
|
||||
|
||||
|
||||
class TransactionStream(DataStream):
|
||||
def __init__(self, stream_id: str, stream_type: str) -> None:
|
||||
def __init__(
|
||||
self, stream_id: str, stream_type: str = "Financial Data"
|
||||
) -> None:
|
||||
super().__init__(stream_id, stream_type)
|
||||
|
||||
def process_batch(self, data_batch: List[Any]) -> str:
|
||||
@@ -125,7 +129,9 @@ class TransactionStream(DataStream):
|
||||
|
||||
|
||||
class EventStream(DataStream):
|
||||
def __init__(self, stream_id: str, stream_type: str) -> None:
|
||||
def __init__(
|
||||
self, stream_id: str, stream_type: str = "System Events"
|
||||
) -> None:
|
||||
super().__init__(stream_id, stream_type)
|
||||
|
||||
def process_batch(self, data_batch: List[Any]) -> str:
|
||||
@@ -160,15 +166,17 @@ class EventStream(DataStream):
|
||||
|
||||
class StreamProcessor:
|
||||
def __init__(self) -> None:
|
||||
self.streams: Dict[
|
||||
str, List[Union[SensorStream, TransactionStream, EventStream]]
|
||||
] = {"sensor": [], "transaction": [], "event": []}
|
||||
self.streams: Dict[str, List[DataStream]] = {
|
||||
"sensor": [],
|
||||
"transaction": [],
|
||||
"event": [],
|
||||
}
|
||||
|
||||
def process_batch(
|
||||
self,
|
||||
data_batch: List[Any],
|
||||
stream: Union[SensorStream, TransactionStream, EventStream],
|
||||
) -> Union[SensorStream, TransactionStream, EventStream]:
|
||||
stream: DataStream,
|
||||
) -> DataStream:
|
||||
try:
|
||||
stream.process_batch(data_batch)
|
||||
if isinstance(stream, SensorStream):
|
||||
@@ -205,7 +213,7 @@ class StreamProcessor:
|
||||
return res
|
||||
|
||||
|
||||
def data_stream_tester():
|
||||
def data_stream_tester() -> None:
|
||||
print("=== CODE NEXUS - POLYMORPHIC STREAM SYSTEM ===\n")
|
||||
sensor_stream = SensorStream("SENSOR_001", "Environmental Data")
|
||||
print(f"Stream ID: {sensor_stream.stream_id},\
|
||||
@@ -235,7 +243,7 @@ def data_stream_tester():
|
||||
events, {event_analysis['nb_error']} error detected")
|
||||
|
||||
|
||||
def stream_processor_tester():
|
||||
def stream_processor_tester() -> None:
|
||||
print("=== Polymorphic Stream Processing ===\n")
|
||||
processor = StreamProcessor()
|
||||
data_batch = [
|
||||
@@ -243,7 +251,7 @@ def stream_processor_tester():
|
||||
["buy:100", "sell:150"],
|
||||
["login", "error", "logout", "login", "error"],
|
||||
]
|
||||
streams = [
|
||||
streams: list[DataStream] = [
|
||||
SensorStream("SENSOR_001", "Environmental Data"),
|
||||
TransactionStream("TRANS_001", "Financial Data"),
|
||||
EventStream("EVENT_001", "System Events"),
|
||||
|
||||
Reference in New Issue
Block a user