Signal Modules
Signal modules provide async signal collection, processing, and caching for the trading agent.
SignalOrchestrator
Orchestrates concurrent signal collection from multiple providers.
Module: hyperliquid_agent.signals.orchestrator
Constructor
def __init__(self, config: dict | None = None) -> NoneInitialize signal orchestrator with providers and collectors.
Parameters:
config(dict | None): Signal configuration dictionary with optional keys:collection_timeout_seconds: Global timeout overridecache_db_path: Path to SQLite cache databaseenable_caching: Whether to enable caching (default: True)fast_timeout_seconds: Timeout for fast signals (default: 5.0)medium_timeout_seconds: Timeout for medium signals (default: 15.0)slow_timeout_seconds: Timeout for slow signals (default: 30.0)
Example:
from hyperliquid_agent.signals.orchestrator import SignalOrchestrator
config = {
"cache_db_path": "state/signal_cache.db",
"fast_timeout_seconds": 5.0,
"medium_timeout_seconds": 15.0,
"slow_timeout_seconds": 30.0
}
orchestrator = SignalOrchestrator(config)Methods
collect_signals
async def collect_signals(
self,
request: SignalRequest
) -> SignalResponseCollect signals based on request type with timeout and error handling.
Parameters:
request(SignalRequest): Signal collection request with signal_type and account_state
Returns:
SignalResponse: Collected signals and metadata
Raises:
asyncio.TimeoutError: If collection exceeds timeoutValueError: If signal_type is unknown
Example:
from hyperliquid_agent.signals.service import SignalRequest
from datetime import datetime
request = SignalRequest(
signal_type="fast",
account_state=account_state,
timestamp=datetime.now()
)
response = await orchestrator.collect_signals(request)
print(f"Signals collected: {response.signals}")
print(f"Error: {response.error}")collect_concurrent
async def collect_concurrent(
self,
requests: list[SignalRequest]
) -> list[SignalResponse]Collect multiple signal types concurrently using asyncio.gather().
Parameters:
requests(list[SignalRequest]): List of signal collection requests
Returns:
list[SignalResponse]: List of responses (one per request)
Example:
requests = [
SignalRequest("fast", account_state, datetime.now()),
SignalRequest("medium", account_state, datetime.now()),
SignalRequest("slow", account_state, datetime.now())
]
responses = await orchestrator.collect_concurrent(requests)
for response in responses:
print(f"{response.signal_type}: {response.error or 'success'}")get_health_status
def get_health_status(self) -> dictGet health status of all providers for monitoring.
Returns:
dict: Health metrics for each provider and cache performance
shutdown
async def shutdown(self) -> NoneGracefully shutdown orchestrator and cleanup resources.
SignalService
Bridge between synchronous governance and async signal collection.
Module: hyperliquid_agent.signals.service
Constructor
def __init__(self, config: dict | None = None) -> NoneInitialize signal service.
Parameters:
config(dict | None): Signal configuration dictionary
Example:
from hyperliquid_agent.signals.service import SignalService
service = SignalService(config)
service.start()Methods
start
def start(self) -> NoneStart background thread with async event loop.
stop
def stop(self) -> NoneGracefully stop background thread.
collect_signals_sync
def collect_signals_sync(
self,
signal_type: Literal["fast", "medium", "slow"],
account_state: AccountState,
timeout_seconds: float = 30.0
) -> FastLoopSignals | MediumLoopSignals | SlowLoopSignalsSynchronous interface for governance system to request signals.
Parameters:
signal_type(Literal): Type of signals to collectaccount_state(AccountState): Current account statetimeout_seconds(float): Timeout for signal collection (default: 30.0)
Returns:
- Collected signals (type depends on signal_type)
Example:
# Start service
service = SignalService()
service.start()
# Collect signals synchronously
fast_signals = service.collect_signals_sync("fast", account_state)
print(f"Spreads: {fast_signals.spreads}")
# Stop service when done
service.stop()Data Classes
SignalRequest
Request for signal collection.
Fields:
signal_type(Literal["fast", "medium", "slow"]): Type of signals to collectaccount_state(AccountState): Current account statetimestamp(datetime): Request timestamp
SignalResponse
Response from signal collection.
Fields:
signal_type(Literal["fast", "medium", "slow"]): Type of signals collectedsignals(FastLoopSignals | MediumLoopSignals | SlowLoopSignals): Collected signalstimestamp(datetime): Collection timestamperror(str | None): Error message if collection failed
SQLiteCacheLayer
SQLite-based caching layer with TTL management.
Module: hyperliquid_agent.signals.cache
Constructor
def __init__(
self,
db_path: Path | str = "state/signal_cache.db"
) -> NoneInitialize SQLite cache layer.
Parameters:
db_path(Path | str): Path to SQLite database file (default: "state/signal_cache.db")
Example:
from hyperliquid_agent.signals.cache import SQLiteCacheLayer
from pathlib import Path
cache = SQLiteCacheLayer(Path("state/signal_cache.db"))Methods
get
async def get(self, key: str) -> CacheEntry | NoneRetrieve cached value if not expired.
Parameters:
key(str): Cache key
Returns:
CacheEntry | None: Entry with value and age, or None if not found/expired
Example:
entry = await cache.get("orderbook:BTC")
if entry:
print(f"Value: {entry.value}")
print(f"Age: {entry.age_seconds}s")set
async def set(
self,
key: str,
value: Any,
ttl_seconds: int
) -> NoneStore value with TTL.
Parameters:
key(str): Cache keyvalue(Any): Value to cache (must be picklable)ttl_seconds(int): Time-to-live in seconds
Example:
await cache.set("orderbook:BTC", orderbook_data, ttl_seconds=60)invalidate
async def invalidate(self, pattern: str) -> NoneInvalidate cache entries matching pattern (SQL LIKE syntax).
Parameters:
pattern(str): SQL LIKE pattern (e.g., "orderbook:%" for all orderbook entries)
Example:
# Invalidate all orderbook entries
await cache.invalidate("orderbook:%")
# Invalidate all BTC-related entries
await cache.invalidate("%:BTC")cleanup_expired
async def cleanup_expired(self) -> NoneRemove expired entries (run periodically).
get_metrics
def get_metrics(self) -> CacheMetricsReturn cache hit rate and other metrics.
Returns:
CacheMetrics: Performance statistics
Example:
metrics = cache.get_metrics()
print(f"Hit rate: {metrics.hit_rate:.2f}%")
print(f"Total entries: {metrics.total_entries}")
print(f"Avg age: {metrics.avg_age_seconds:.1f}s")Data Classes
CacheEntry
Cache entry with value and age metadata.
Fields:
value(Any): Cached valueage_seconds(float): Age of entry in seconds
CacheMetrics
Cache performance metrics.
Fields:
total_entries(int): Total valid entriestotal_hits(int): Total cache hitsavg_hits_per_entry(float): Average hits per entryhit_rate(float): Hit rate percentagetotal_misses(int): Total cache missesavg_age_seconds(float): Average age of entriesexpired_entries(int): Count of expired entries
DataProvider
Abstract base class for all data providers.
Module: hyperliquid_agent.signals.providers
Constructor
def __init__(self) -> NoneInitialize provider with circuit breaker and retry config.
Abstract Methods
fetch
@abstractmethod
async def fetch(self, **kwargs) -> ProviderResponseFetch data from the provider.
Parameters:
**kwargs: Provider-specific parameters
Returns:
ProviderResponse: Data with quality metadata
Raises:
Exception: If fetch fails after all retries
get_cache_ttl
@abstractmethod
def get_cache_ttl(self) -> intReturn cache TTL in seconds for this provider's data.
get_provider_name
@abstractmethod
def get_provider_name(self) -> strReturn provider identifier for logging and metrics.
Methods
fetch_with_circuit_breaker
async def fetch_with_circuit_breaker(
self,
fetch_func: Callable[[], Any]
) -> ProviderResponseExecute fetch with circuit breaker protection.
Parameters:
fetch_func(Callable): Async callable that performs the actual fetch
Returns:
ProviderResponse: Result from fetch_func
Raises:
RuntimeError: If circuit breaker is openException: If fetch fails after retries
get_health_status
def get_health_status(self) -> dict[str, Any]Get provider health status for monitoring.
Returns:
dict: Health metrics including circuit state and failure count
Data Classes
ProviderResponse
Standardized response from data providers with quality metadata.
Fields:
data(T): The actual data payloadtimestamp(datetime): When the data was collectedsource(str): Provider identifierconfidence(float): Data quality score (0.0 to 1.0)is_cached(bool): Whether data came from cachecache_age_seconds(float | None): Age of cached data
RetryConfig
Configuration for exponential backoff retry logic.
Fields:
max_attempts(int): Maximum retry attempts (default: 3)backoff_factor(float): Exponential backoff multiplier (default: 2.0)initial_delay_seconds(float): Initial delay before first retry (default: 1.0)max_delay_seconds(float): Maximum delay between retries (default: 10.0)
CircuitBreaker
Circuit breaker for handling sustained provider failures.
Methods:
record_success(): Record successful call, reset failure countrecord_failure(): Record failed call, potentially open circuitcan_attempt(): Check if call should be attemptedget_state(): Get current circuit state
States:
CLOSED: Normal operation, all requests allowedOPEN: Provider failing, reject requests immediatelyHALF_OPEN: Testing recovery, allow limited requests
Utility Functions
fetch_with_retry
async def fetch_with_retry(
fetch_func: Callable[[], Any],
retry_config: RetryConfig,
operation_name: str = "fetch"
) -> AnyExecute async fetch with exponential backoff retry.
Parameters:
fetch_func(Callable): Async callable to executeretry_config(RetryConfig): Retry configurationoperation_name(str): Name for logging purposes
Returns:
- Result from fetch_func
Raises:
Exception: If all retry attempts fail
Example:
from hyperliquid_agent.signals.providers import (
fetch_with_retry,
RetryConfig
)
async def fetch_data():
# Your fetch logic here
return await api.get_data()
config = RetryConfig(max_attempts=3, backoff_factor=2.0)
result = await fetch_with_retry(fetch_data, config, "my_operation")See Also
- Core Modules - Main agent and execution
- Governance Modules - Strategy governance
- Signals Architecture - Detailed signal design
- Performance Tuning - Cache optimization