-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcross_rate_matrix.py
More file actions
355 lines (286 loc) · 13.7 KB
/
cross_rate_matrix.py
File metadata and controls
355 lines (286 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""Exchange rate matrix builder for cross-currency arbitrage detection."""
import time
import threading
from typing import Dict, List, Tuple, Optional, Set
from dataclasses import dataclass, field
from collections import defaultdict
from logger import logger
@dataclass
class ExchangeRate:
"""Represents an exchange rate between two currencies."""
from_currency: str
to_currency: str
rate: float
spread: float
timestamp: float
source_symbol: str = ""
fee: float = 0.001 # Default taker fee
class CrossRateMatrix:
"""Manages cross-currency exchange rates and finds arbitrage opportunities."""
def __init__(self):
self._rates: Dict[Tuple[str, str], ExchangeRate] = {}
self._currencies: Set[str] = set()
self._lock = threading.RLock()
self._last_update = 0.0
# Graph representation for pathfinding
self._graph: Dict[str, Dict[str, ExchangeRate]] = defaultdict(dict)
def update_from_symbol_prices(self, symbol_prices: Dict[str, Dict[str, float]],
markets: Dict[str, dict]):
"""Update cross rates from symbol price data."""
with self._lock:
# Clear old data
self._rates.clear()
self._currencies.clear()
self._graph.clear()
for symbol, price_data in symbol_prices.items():
if symbol not in markets:
continue
# Validate price data
if not isinstance(price_data, dict):
continue
market = markets[symbol]
base = market['base']
quote = market['quote']
# Validate base and quote
if not base or not quote:
continue
self._currencies.add(base)
self._currencies.add(quote)
bid = price_data.get('bid', 0)
ask = price_data.get('ask', 0)
timestamp = price_data.get('timestamp', time.time() * 1000)
# Validate numeric values
if not isinstance(bid, (int, float)) or not isinstance(ask, (int, float)):
continue
if bid is None or ask is None or bid <= 0 or ask <= 0:
continue
if bid > 0 and ask > 0:
# Base -> Quote (sell base for quote)
rate_bq = ExchangeRate(
from_currency=base,
to_currency=quote,
rate=bid, # We get quote at bid price
spread=ask - bid,
timestamp=timestamp,
source_symbol=symbol
)
# Quote -> Base (buy base with quote)
rate_qb = ExchangeRate(
from_currency=quote,
to_currency=base,
rate=1.0 / ask, # We pay ask price for base
spread=(ask - bid) / (ask * bid),
timestamp=timestamp,
source_symbol=symbol
)
# Store direct rates
self._rates[(base, quote)] = rate_bq
self._rates[(quote, base)] = rate_qb
# Update graph
self._graph[base][quote] = rate_bq
self._graph[quote][base] = rate_qb
self._last_update = time.time()
logger.info(f"Updated cross-rate matrix: {len(self._currencies)} currencies, "
f"{len(self._rates)} direct rates")
def get_direct_rate(self, from_currency: str, to_currency: str) -> Optional[ExchangeRate]:
"""Get direct exchange rate if available."""
with self._lock:
return self._rates.get((from_currency, to_currency))
def find_rate_path(self, from_currency: str, to_currency: str,
max_hops: int = 3) -> Optional[List[ExchangeRate]]:
"""Find exchange rate path using BFS."""
with self._lock:
if from_currency == to_currency:
return []
if from_currency not in self._currencies or to_currency not in self._currencies:
return None
# BFS to find shortest path
queue = [(from_currency, [])]
visited = {from_currency}
while queue:
current, path = queue.pop(0)
if len(path) >= max_hops:
continue
if current in self._graph:
for next_currency, rate in self._graph[current].items():
if next_currency == to_currency:
return path + [rate]
if next_currency not in visited and len(path) < max_hops - 1:
visited.add(next_currency)
queue.append((next_currency, path + [rate]))
return None
def calculate_cross_rate(self, from_currency: str, to_currency: str,
amount: float = 1.0) -> Optional[Tuple[float, List[ExchangeRate], float]]:
"""Calculate cross rate via best path, returns (final_amount, path, total_fee)."""
path = self.find_rate_path(from_currency, to_currency)
if not path:
return None
current_amount = amount
total_fee = 0.0
for rate in path:
# Apply rate
current_amount *= rate.rate
# Apply fee
fee_amount = current_amount * rate.fee
current_amount -= fee_amount
total_fee += rate.fee
return current_amount, path, total_fee
def find_arbitrage_cycles(self, base_currency: str = "USDT",
min_profit_threshold: float = 0.0001) -> List[Dict]:
"""Find profitable arbitrage cycles starting from base currency."""
with self._lock:
cycles = []
if base_currency not in self._currencies:
logger.warning(f"Base currency {base_currency} not found in matrix")
return cycles
# Find cycles of length 3 and 4
for cycle_length in [3, 4]:
found_cycles = self._find_cycles_dfs(base_currency, cycle_length)
for cycle in found_cycles:
profit_info = self._calculate_cycle_profit(cycle)
if profit_info and profit_info.get('profit_ratio', 0) > min_profit_threshold:
cycles.append(profit_info)
# Sort by profit ratio
cycles.sort(key=lambda x: x.get('profit_ratio', 0), reverse=True)
return cycles
def _find_cycles_dfs(self, start_currency: str, target_length: int) -> List[List[str]]:
"""Find cycles of specific length using DFS."""
cycles = []
# Validate input
if target_length is None or target_length < 3:
return cycles
def dfs(current: str, path: List[str], visited: Set[str]):
if len(path) == target_length:
if start_currency in self._graph.get(current, {}):
cycles.append(path + [start_currency])
return
if current in self._graph:
for next_currency in self._graph[current]:
if next_currency not in visited or (next_currency == start_currency and len(path) == target_length - 1):
if next_currency != start_currency or len(path) == target_length - 1:
new_visited = visited.copy()
new_visited.add(next_currency)
dfs(next_currency, path + [next_currency], new_visited)
dfs(start_currency, [start_currency], {start_currency})
return cycles
def _calculate_cycle_profit(self, cycle: List[str], initial_amount: float = 1.0) -> Optional[Dict]:
"""Calculate profit for a currency cycle."""
if len(cycle) < 3:
return None
current_amount = initial_amount
total_fees = 0.0
steps = []
for i in range(len(cycle) - 1):
from_curr = cycle[i]
to_curr = cycle[i + 1]
rate = self.get_direct_rate(from_curr, to_curr)
if not rate:
return None
# Apply exchange rate
new_amount = current_amount * rate.rate
# Apply fee
fee_amount = new_amount * rate.fee
final_amount = new_amount - fee_amount
# Determine trade side based on symbol format
# Example: BTC/USDT means BTC is base, USDT is quote
# If going USDT -> BTC, we BUY BTC with USDT
# If going BTC -> USDT, we SELL BTC for USDT
symbol_parts = rate.source_symbol.split('/')
if len(symbol_parts) == 2:
base_curr = symbol_parts[0]
quote_curr = symbol_parts[1]
# Determine side based on which currency we're acquiring
if to_curr == base_curr:
side = 'buy' # Buying the base currency
elif to_curr == quote_curr:
side = 'sell' # Selling the base currency for quote
else:
side = 'buy' # Default fallback
else:
side = 'buy' # Default fallback
steps.append({
'from': from_curr,
'to': to_curr,
'rate': rate.rate,
'side': side, # Added for trading
'fee': rate.fee,
'amount_before': current_amount,
'amount_after': final_amount,
'symbol': rate.source_symbol
})
current_amount = final_amount
total_fees += rate.fee
profit = current_amount - initial_amount
profit_ratio = profit / initial_amount if initial_amount > 0 else 0.0
# Ensure profit_ratio is a valid number
if profit_ratio is None or not isinstance(profit_ratio, (int, float)):
profit_ratio = 0.0
return {
'cycle': cycle,
'initial_amount': initial_amount,
'final_amount': current_amount,
'profit': profit,
'profit_ratio': float(profit_ratio),
'profit_percentage': float(profit_ratio) * 100,
'total_fees': total_fees,
'steps': steps,
'timestamp': self._last_update
}
def get_all_currencies(self) -> List[str]:
"""Get list of all available currencies."""
with self._lock:
return sorted(list(self._currencies))
def get_currency_pairs(self, currency: str) -> List[str]:
"""Get all currencies that can be directly exchanged with given currency."""
with self._lock:
if currency in self._graph:
return list(self._graph[currency].keys())
return []
def get_stats(self) -> Dict:
"""Get statistics about the cross-rate matrix."""
with self._lock:
age_seconds = time.time() - self._last_update if self._last_update > 0 else 0.0
return {
'currencies': len(self._currencies),
'direct_rates': len(self._rates),
'last_update': self._last_update,
'age_seconds': age_seconds
}
def print_matrix_summary(self):
"""Print a summary of the cross-rate matrix."""
stats = self.get_stats()
logger.info(f"Cross-Rate Matrix Summary:")
logger.info(f" Currencies: {stats['currencies']}")
logger.info(f" Direct rates: {stats['direct_rates']}")
logger.info(f" Last update: {stats['age_seconds']:.1f} seconds ago")
# Show some example rates
currencies = self.get_all_currencies()[:5] # First 5 currencies
logger.info("Sample rates:")
for from_curr in currencies:
for to_curr in currencies:
if from_curr != to_curr:
rate = self.get_direct_rate(from_curr, to_curr)
if rate:
logger.info(f" {from_curr}/{to_curr}: {rate.rate:.6f}")
if __name__ == "__main__":
# Test the cross-rate matrix
matrix = CrossRateMatrix()
# Simulate some price data
test_prices = {
'BTC/USDT': {'bid': 45000, 'ask': 45050, 'timestamp': time.time() * 1000},
'ETH/USDT': {'bid': 3000, 'ask': 3005, 'timestamp': time.time() * 1000},
'ETH/BTC': {'bid': 0.066, 'ask': 0.0661, 'timestamp': time.time() * 1000},
}
test_markets = {
'BTC/USDT': {'base': 'BTC', 'quote': 'USDT'},
'ETH/USDT': {'base': 'ETH', 'quote': 'USDT'},
'ETH/BTC': {'base': 'ETH', 'quote': 'BTC'},
}
matrix.update_from_symbol_prices(test_prices, test_markets)
matrix.print_matrix_summary()
# Test arbitrage detection
cycles = matrix.find_arbitrage_cycles("USDT")
logger.info(f"Found {len(cycles)} potential arbitrage cycles")
for cycle in cycles:
logger.info(f"Cycle: {' -> '.join(cycle['cycle'])}")
logger.info(f"Profit: {cycle['profit_percentage']:.4f}%")