Data Processing Pipeline
Buffered Message Processing
High-frequency data is processed in batches for optimal performance:
class BufferedMessageProcessor:
def __init__(self, batch_size=50, flush_interval=0.1):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = []
self.last_flush = time.time()Batch Processing Logic:
Messages accumulate in buffer until batch_size is reached
Automatic flush every 100ms if buffer not empty
Parallel processing of exchange-specific data
Error handling with exception logging
Pub/Sub Distribution
Redis pub/sub enables real-time data distribution:
Channels Used:
aster:funding:rates- Aster funding rate updatesbackpack:funding:rates- Backpack funding rate updateslighter:funding:rates- zkLighter funding rate updatesorderbook:updates- Orderbook change notifications
Subscriber Pattern:
Arbitrage engine subscribes to all funding channels
Dashboard receives filtered opportunity alerts
Multiple consumers can subscribe simultaneously
Last updated

