Compare commits

5 Commits

Author SHA1 Message Date
159095ce5a chore: finalize file rename by removing old CoreTraidMath.py
Complete the file rename by removing the old file with typo.

This commit ensures the git history properly tracks the rename from
CoreTraidMath.py to CoreTradeMath.py.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 18:40:28 +01:00
92eb7db4c5 refactor: update test file to use new standardized method names
Update test_decision.py to work with refactored core modules.

Changes:
- Removed wildcard imports, using explicit imports:
  - from decisionManager_v2 import DecisionManager
  - from indicators_v2 import ind_BB
  - from signals_v2 import sig_BB
- Updated method calls to use snake_case naming:
  - test.getRetroTrendAns() → test.get_retro_trend_answer()
  - test.generateMatrixProbabilityFromDict() → test.generate_matrix_probability_from_dict()
  - test.getOnlineAns() → test.get_online_answer()
- Updated variable names to snake_case:
  - sigAgrReq → sig_agr_req
  - sigAgrRetroTemplate → sig_agr_retro_template
  - retroAns → retro_ans
  - sigAgrData → sig_agr_data
- Improved spacing and formatting for PEP 8 compliance

The test file now follows the same coding standards as the refactored
core modules and maintains compatibility with all renamed methods.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 18:38:42 +01:00
bfa0d13a25 refactor: standardize DecisionManager, DealManager, and RiskManager
Complete standardization of remaining core management classes.

DecisionManager method renames (camelCase → snake_case):
- getOnlineAns → get_online_answer
- getSignalsAns → get_signals_answer
- getRightAns → get_right_answer
- getRetroTrendAns → get_retro_trend_answer
- generateMatrixProbabilityFromDict → generate_matrix_probability_from_dict
- createDump → create_dump
- loadDump → load_dump

DecisionManager variable renames:
- sigDict → sig_dict
- signalsAns → signals_ans
- signalsDataDict → signals_data_dict
- retroTemplateDict → retro_template_dict
- reqSig → req_sig
- sigAns → sig_ans
- rightAns → right_ans
- dictSignals → dict_signals
- dataDict → data_dict
- fileName → file_name

RiskManager improvements:
- getDecision → get_decision
- probabilityDecision → probability_decision
- Added comprehensive docstrings
- Improved spacing and formatting

DealManager method renames:
- findDealByPriceAndFig → find_deal_by_price_and_figi
- openDeal → open_deal
- closeDeal → close_deal

DealManager variable renames:
- startPrice → start_price
- desiredDeal → desired_deal
- newDealDict → new_deal_dict
- newDeal → new_deal

Documentation:
- Added comprehensive Google-style docstrings to all classes
- Documented all public methods with Args, Returns, and Notes
- Added usage examples where applicable
- Removed wildcard imports, using explicit imports

Follows: PEP 8, Google Python Style Guide

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 18:36:24 +01:00
00c7614bfc fix: correct class name typos and variable naming issues
Fix critical typos in class names and variables that could cause confusion
and runtime errors.

Class name fixes:
- trandeVoter → TradeVoter (market_trade/core/trandeVoter.py)
- decsionManager → DecisionManager (market_trade/core/decisionManager_v2.py)
- coreSignalTrande → CoreSignalTrade (market_trade/core/signals_v2.py)
- coreIndicator → CoreIndicator (market_trade/core/indicators_v2.py)
- indicatorsAgrigator → IndicatorsAggregator (indicators_v2.py)
- signalsAgrigator → SignalsAggregator (signals_v2.py)
- riskManager → RiskManager (market_trade/core/riskManager.py)

Variable typo fixes:
- commision → commission (riskManager.py, lines 8-9, 24)
- probabilityDecsion → probability_decision (decisionManager_v2.py:84)

Type hint corrections:
- Fixed pd.DataFrame() → pd.DataFrame (incorrect syntax in 4 files)

Bug fixes:
- Fixed mutable default argument antipattern in indicators_v2.py:33
  (indDict={} → indDict=None)
- Fixed mutable default argument in CoreTradeMath.py:22
  (params={} → params=None)

All class references updated throughout the codebase to maintain
consistency.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 18:34:51 +01:00
bbba7bfe89 refactor: rename CoreTraidMath.py to CoreTradeMath.py
Fix typo in core math module filename and update all references.

Changes:
- Renamed market_trade/core/CoreTraidMath.py → CoreTradeMath.py
- Updated 28 import references across 14 files:
  - All Ind_*.py indicator modules
  - indicators.py, indicators_v2.py
  - signals.py, signals_v2.py
  - CoreDraw.py
- Updated documentation references in CLAUDE.md

This eliminates the "Traid" typo and aligns with proper English spelling.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 18:33:21 +01:00
23 changed files with 838 additions and 485 deletions

151
CLAUDE.md Normal file
View File

@@ -0,0 +1,151 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
This is a Python-based algorithmic trading system for financial markets that implements technical indicator analysis, signal generation, decision making, and risk management. It integrates with Tinkoff Invest API for market data and trading.
## Development Setup
### Environment Setup
- Python 3.9-3.12 (managed via Poetry)
- Install dependencies: `poetry install`
- Activate virtual environment: `poetry shell`
- Environment variables are in `.env` (contains Tinkoff API tokens)
### Docker Development
- Build image: `docker build -f dockerfiles/Dockerfile -t market-trade .`
- Main Dockerfile uses Poetry 1.7.1 and Python 3.11
- Requires SSH mount for private tinkoff-grpc dependency
### Running Tests
- Test files located in `market_trade/tests/`
- Run test: `python market_trade/tests/test_decision.py`
- Run specific test: `python market_trade/tests/test_dataloader.py`
### Data Tools
Scripts in `tools/` directory for data collection:
- `save_currencies_data.py` - Collect currency market data
- `save_shares_data.py` - Collect stock market data
- `get_shares_stats.py` - Generate trading statistics
- Usage: `python tools/<script_name>.py [options]`
## Architecture
### Core Trading Pipeline (docs/trading-flow.md)
The system follows this data flow:
1. **SELECT INSTRUMENT** - Choose trading instrument
2. **GET_CANDLES(10000)** - Fetch historical candlestick data
3. **RETRO TRAINING** - Backtest signals on historical data
4. **STREAM PROCESSING**:
- Receive real-time market messages
- Accumulate data in sliding window
- Update window with each new message
- Generate trading signals
### Module Structure
#### `market_trade/core/` - Core Trading Logic
**Signal Processing Chain:**
1. **Indicators** (`indicators.py`, `indicators_v2.py`) - Technical indicator calculation
- Base class: `coreIndicator`
- Bollinger Bands: `ind_BB`
- All indicator classes (Ind_*.py): ADX, Alligator, DonchianChannel, Envelopes, Gator, Ishimoku, LRI, STD, Stochastic, bollingerBands
2. **Signals** (`signals.py`, `signals_v2.py`) - Signal generation from indicators
- Base class: `coreSignalTrande` with three modes:
- `online` - Real-time signal generation
- `retro` - Expanding window backtesting
- `retroFast` - Sliding window backtesting
- Signal implementations: `signal_BB` (Bollinger Bands signal)
- Aggregator: `signalAgrigator` manages multiple signal instances
3. **Decision Manager** (`decisionManager.py`, `decisionManager_v2.py`) - Trading decisions
- Class: `decsionManager`
- Combines signals from `signalAgrigator`
- Uses `trandeVoter` for probability matrix generation
- Methods:
- `getSignalTest()` - Test signal generation
- `generateMatrixProbability()` - Create probability matrices from backtest
- `getOnlineAns()` - Real-time decision making
4. **Trade Voter** (`trandeVoter.py`) - Probability-based decision weighting
- Generates probability matrices from historical signal performance
- Weights multiple signals to produce final decision
5. **Risk Manager** (`riskManager.py`) - Position sizing and risk controls
- Class: `riskManager`
- Combines signal decisions with risk parameters
6. **Deal Manager** (`dealManager.py`) - Trade execution and management
- Class: `DealManager`
- Manages active positions and orders
**Helper Modules:**
- `CoreTradeMath.py` - Mathematical operations for indicators (moving averages, STD)
- `CoreDraw.py` - Visualization utilities for indicators and signals
#### `market_trade/data/` - Data Loading
- `dataloader.py` - Contains `DukaMTInterface` class
- Converts Dukascopy format candlestick data to internal format
- Separates bid/ask candlesticks from multi-indexed CSV
- Handles both file paths and DataFrames
#### `market_trade/tests/` - Testing
- Test files demonstrate usage patterns:
- `test_decision.py` - Shows complete decision manager workflow with retro training
- `test_dataloader.py` - Data loading tests
### External Dependencies
- **tinkoff-grpc** - Private GitHub repo for Tinkoff Invest API integration
- Located at: `git@github.com:strategy155/tinkoff_grpc.git`
- Used in tools for market data collection
- **Data Analysis**: pandas, numpy, scipy, matplotlib, plotly, mplfinance
- **Web Scraping**: requests-html, beautifulsoup4, selenium
- **Development**: JupyterLab (notebooks in `notebooks/`)
## Key Constants (market_trade/constants.py)
- `ROOT_PATH` - Project root directory
- `CANDLESTICK_DATASETS_PATH` - Path to candlestick data: `data/candlesticks/`
- `TEST_CANDLESTICKS_PATH` - Test dataset: `data/EURUSD_price_candlestick.csv`
- `TINKOFF_TOKEN_STRING` - Production API token (from .env)
- `SANDBOX_TOKEN_STRING` - Sandbox API token (from .env)
- `TINKOFF_API_ADDRESS` - API endpoint: 'invest-public-api.tinkoff.ru:443'
## Data Formats
### Candlestick Data
Expected DataFrame columns:
- `date` - Timestamp
- `open`, `high`, `low`, `close` - OHLC price data
- For bid/ask data: Multi-indexed with ('bid'/'ask', 'open'/'high'/'low'/'close')
### Signal Configuration Dictionary
```python
{
'signal_name': {
'className': signal_class, # e.g., sig_BB
'indParams': {...}, # Indicator parameters
'signalParams': { # Signal parameters
'source': 'close', # Source price column
'target': 'close' # Target price column for analysis
},
'batchSize': 30 # Window size
}
}
```
## Development Notes
- Code contains Russian comments and variable names (e.g., "агрегатор", "индикаторы")
- Version 2 modules (`*_v2.py`) represent newer implementations
- The system uses sliding window approach for real-time signal generation
- Backtesting generates probability matrices that weight signal reliability
- Data symlink: `data/` -> `/var/data0/markettrade_data`

0
docs/trading-flow.md Normal file
View File

View File

@@ -25,7 +25,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import plotly.express as px import plotly.express as px

View File

@@ -19,15 +19,14 @@ import datetime
class CoreMath: class CoreMath:
def __init__(self, base_df, params={ def __init__(self, base_df, params=None):
'dataType':'ohcl', default_params = {
'action': None, 'dataType':'ohcl',
'actionOptions':{} 'action': None,
} 'actionOptions':{}
): }
self.base_df=base_df.reset_index(drop=True) self.base_df=base_df.reset_index(drop=True)
self.params=params self.params=params if params is not None else default_params
if self.params['dataType']=='ohcl': if self.params['dataType']=='ohcl':
self.col=self.base_df[self.params['actionOptions']['valueType']] self.col=self.base_df[self.params['actionOptions']['valueType']]
elif self.params['dataType']=='series': elif self.params['dataType']=='series':

View File

@@ -25,7 +25,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
init_notebook_mode() init_notebook_mode()
@@ -82,7 +82,7 @@ class ADXI:
'action':'findMean', 'action':'findMean',
'actionOptions':{'MeanType':'EMA','span':10} 'actionOptions':{'MeanType':'EMA','span':10}
} }
ans=np.asarray(CoreTraidMath.CoreMath(ser,op).ans) ans=np.asarray(CoreTradeMath.CoreMath(ser,op).ans)
#print(ans) #print(ans)
#ans = np.asarray(ser.ewm(span=40,adjust=False).mean().to_list()) #ans = np.asarray(ser.ewm(span=40,adjust=False).mean().to_list())
#print(ans) #print(ans)

View File

@@ -24,7 +24,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
@@ -46,7 +46,7 @@ class Alligator:
'valueType':self.options['valueType'], 'valueType':self.options['valueType'],
'window':self.options[keyAns]['window']} 'window':self.options[keyAns]['window']}
} }
ans=market_trade.core.CoreTraidMath.CoreMath(self.base_df,op).ans ans=market_trade.core.CoreTradeMath.CoreMath(self.base_df,op).ans
return ans return ans

View File

@@ -24,7 +24,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
@@ -62,9 +62,9 @@ class IDC:
} }
for i in range(self.options['window'],len(self.base_df)-self.options['shift']+1): for i in range(self.options['window'],len(self.base_df)-self.options['shift']+1):
ans['MaxExt'].append(CoreTraidMath.CoreMath(self.base_df[i-self.options['window']:i],opMax).ans) ans['MaxExt'].append(CoreTradeMath.CoreMath(self.base_df[i-self.options['window']:i],opMax).ans)
ans['x'].append(self.base_df['date'][i-1+self.options['shift']]) ans['x'].append(self.base_df['date'][i-1+self.options['shift']])
ans['MinExt'].append(CoreTraidMath.CoreMath(self.base_df[i-self.options['window']:i],opMin).ans) ans['MinExt'].append(CoreTradeMath.CoreMath(self.base_df[i-self.options['window']:i],opMin).ans)
return ans return ans

View File

@@ -24,7 +24,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
class Envelopes: class Envelopes:
@@ -64,7 +64,7 @@ class Envelopes:
} }
if dictResp['MeanType']=='SMA': if dictResp['MeanType']=='SMA':
y=market_trade.core.CoreTraidMath.CoreMath(self.base_df,op).ans y=market_trade.core.CoreTradeMath.CoreMath(self.base_df,op).ans
ans['MainEnv']=y[:len(y)-self.options['shift']] ans['MainEnv']=y[:len(y)-self.options['shift']]
ans['PlusEnv']=ans['MainEnv']*(1+self.options['kProc']/100) ans['PlusEnv']=ans['MainEnv']*(1+self.options['kProc']/100)
ans['MinusEnv']=ans['MainEnv']*(1-self.options['kProc']/100) ans['MinusEnv']=ans['MainEnv']*(1-self.options['kProc']/100)

View File

@@ -24,7 +24,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
import market_trade.core.Ind_Alligator import market_trade.core.Ind_Alligator

View File

@@ -25,7 +25,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import plotly.express as px import plotly.express as px

View File

@@ -25,7 +25,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw

View File

@@ -25,7 +25,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
class ISTD: class ISTD:
@@ -53,7 +53,7 @@ class ISTD:
'actionOptions':{'valueType':self.options['valueType']} 'actionOptions':{'valueType':self.options['valueType']}
} }
x=self.base_df['date'].to_list() x=self.base_df['date'].to_list()
y= CoreTraidMath.CoreMath(self.base_df,op).ans y= CoreTradeMath.CoreMath(self.base_df,op).ans
ans={'y':y,'x':x} ans={'y':y,'x':x}

View File

@@ -25,7 +25,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
class Stochastic: class Stochastic:
@@ -69,7 +69,7 @@ class Stochastic:
'action':'findMean', 'action':'findMean',
'actionOptions':{'MeanType':'SMA','window':self.options['windowSMA']} 'actionOptions':{'MeanType':'SMA','window':self.options['windowSMA']}
} }
ans=np.asarray(market_trade.core.CoreTraidMath.CoreMath(ser,op).ans) ans=np.asarray(market_trade.core.CoreTradeMath.CoreMath(ser,op).ans)
return ans return ans
#return np.convolve(col, np.ones(self.options['windowSMA']), 'valid') /self.options['windowSMA'] #return np.convolve(col, np.ones(self.options['windowSMA']), 'valid') /self.options['windowSMA']

View File

@@ -24,7 +24,7 @@ from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots from plotly.subplots import make_subplots
init_notebook_mode() init_notebook_mode()
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
@@ -50,12 +50,12 @@ class BB:
'window':self.options['window'] 'window':self.options['window']
} }
} }
ans['BB']=market_trade.core.CoreTraidMath.CoreMath(self.base_df,opMA).ans ans['BB']=market_trade.core.CoreTradeMath.CoreMath(self.base_df,opMA).ans
opSTD={'dataType':'ohcl', opSTD={'dataType':'ohcl',
'action':'findSTD', 'action':'findSTD',
'actionOptions':{'valueType':self.options['valueType'],'window':self.options['window']} 'actionOptions':{'valueType':self.options['valueType'],'window':self.options['window']}
} }
ans['STD']=market_trade.core.CoreTraidMath.CoreMath(self.base_df,opSTD).ans ans['STD']=market_trade.core.CoreTradeMath.CoreMath(self.base_df,opSTD).ans
ans['pSTD']=ans['BB']+ans['STD']*self.options['kDev'] ans['pSTD']=ans['BB']+ans['STD']*self.options['kDev']
ans['mSTD']=ans['BB']-ans['STD']*self.options['kDev'] ans['mSTD']=ans['BB']-ans['STD']*self.options['kDev']
ans['x']=np.array(self.base_df['date'][self.options['window']-1:].to_list()) ans['x']=np.array(self.base_df['date'][self.options['window']-1:].to_list())

View File

@@ -1,49 +1,77 @@
import pandas as pd import pandas as pd
import datetime import datetime
import numpy as np import numpy as np
import uuid import uuid
class DealManager(): class DealManager():
"""Manages open trading positions and deal lifecycle.
Tracks active positions with their entry prices and quantities,
supporting both opening new positions and closing existing ones.
"""
def __init__(self): def __init__(self):
#self.commission=0.04 """Initialize DealManager with empty deals DataFrame."""
self.columns=['uuid','figi','amount','startPrice'] self.columns = ['uuid', 'figi', 'amount', 'startPrice']
self.deals = pd.DataFrame(columns=self.columns) self.deals = pd.DataFrame(columns=self.columns)
self.deals = self.deals.set_index('uuid') self.deals = self.deals.set_index('uuid')
def findDealByPriceAndFig(self,price,figi): def find_deal_by_price_and_figi(self, price: float, figi: str):
ans=None """Find existing deal by price and instrument identifier.
Args:
price: Entry price to search for.
figi: Financial Instrument Global Identifier.
Returns:
Deal UUID if found, None otherwise.
"""
ans = None
for i in range(self.deals.shape[0]): for i in range(self.deals.shape[0]):
if self.deals.iloc[i].startPrice == price and self.deals.iloc[i].figi == figi: if self.deals.iloc[i].startPrice == price and self.deals.iloc[i].figi == figi:
ans = self.deals.iloc[i].name ans = self.deals.iloc[i].name
break break
return ans return ans
def openDeal(self,figi,startPrice,amount=1): def open_deal(self, figi: str, start_price: float, amount: int = 1) -> None:
desiredDeal=self.findDealByPriceAndFig(startPrice,figi) """Open new deal or add to existing position.
if desiredDeal == None:
newDealDict={ If a deal with the same FIGI and price exists, adds to the amount.
'uuid':[str(uuid.uuid4())], Otherwise creates a new deal entry.
'figi':[figi],
'startPrice':[startPrice], Args:
'amount':[amount] figi: Financial Instrument Global Identifier.
start_price: Entry price for the position.
amount: Number of units to trade (default 1).
"""
desired_deal = self.find_deal_by_price_and_figi(start_price, figi)
if desired_deal is None:
new_deal_dict = {
'uuid': [str(uuid.uuid4())],
'figi': [figi],
'startPrice': [start_price],
'amount': [amount]
} }
#newDealDict['profit']=[startPrice*pow(1+self.commission,2)] new_deal = pd.DataFrame.from_dict(new_deal_dict).set_index('uuid')
self.deals = pd.concat([self.deals, new_deal])
newDeal=pd.DataFrame.from_dict(newDealDict).set_index('uuid')
self.deals=pd.concat([self.deals, newDeal])
else: else:
self.deals.at[desiredDeal,'amount'] += amount self.deals.at[desired_deal, 'amount'] += amount
def closeDeal(self,uuid,amount): def close_deal(self, uuid_str: str, amount: int) -> None:
"""Close deal partially or completely.
desiredDeal=self.deals.loc[uuid]
if desiredDeal.amount - amount == 0: Args:
self.deals = self.deals.drop(labels = [uuid],axis = 0) uuid_str: Deal UUID to close.
amount: Number of units to close.
Note:
If amount equals deal amount, removes deal entirely.
Otherwise decreases deal amount.
"""
desired_deal = self.deals.loc[uuid_str]
if desired_deal.amount - amount == 0:
self.deals = self.deals.drop(labels=[uuid_str], axis=0)
else: else:
self.deals.at[uuid,'amount'] -= amount self.deals.at[uuid_str, 'amount'] -= amount
#self.deals.loc[uuid].amount = desiredDeal.amount - amount

View File

@@ -1,4 +1,5 @@
import os import os
import pickle
import pandas as pd import pandas as pd
import datetime import datetime
@@ -6,156 +7,181 @@ import numpy as np
from tqdm import tqdm from tqdm import tqdm
from market_trade.core.indicators_v2 import * from market_trade.core.indicators_v2 import ind_BB
from market_trade.core.signals_v2 import * from market_trade.core.signals_v2 import sig_BB, SignalsAggregator
from market_trade.core.dealManager import * from market_trade.core.dealManager import DealManager
from market_trade.core.trandeVoter import * from market_trade.core.trandeVoter import TradeVoter
from market_trade.core.riskManager import * from market_trade.core.riskManager import RiskManager
import pickle
class decsionManager: class DecisionManager:
''' """Manages trading decisions based on signals, probability voting, and risk management.
sigAgrReq = {
'sig_BB':{ Coordinates the entire decision-making pipeline:
'className':sig_BB, 1. Signals from indicators
'params':{'source':'close','target':'close'}, 2. Probability-based voting (TradeVoter)
'indicators':{ 3. Risk assessment (RiskManager)
'ind_BB':{ 4. Deal tracking (DealManager)
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2.5} Example configuration:
} sig_config = {
'sig_BB': {
'className': sig_BB,
'params': {'source': 'close', 'target': 'close'},
'indicators': {
'ind_BB': {
'className': ind_BB,
'params': {'MeanType': 'SMA', 'window': 30, 'valueType': 'close', 'kDev': 2.5}
}
}
}
} }
}, """
'sig_BB_2':{
'className':sig_BB,
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2}
}
}
}
}
sigAgrData = { def __init__(self, name: str, sig_dict: dict):
'sig_BB':{ """Initialize DecisionManager with configuration.
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
},
'sig_BB_2':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
}
}
Args:
sigAgrRetroTemplate = { name: Identifier for this decision manager instance.
'sig_BB':{ sig_dict: Dictionary of signal configurations.
'signalData': None, """
'indicatorData' :{'ind_BB': None} self.RM = RiskManager()
},
'sig_BB_2':{
'signalData': None,
'indicatorData' :{'ind_BB': None}
}
}
'''
def __init__(self,name, sigDict: dict):
self.RM = riskManager()
self.DM = DealManager() self.DM = DealManager()
self.TV = trandeVoter(name) self.TV = TradeVoter(name)
self.SA = signalsAgrigator(sigDict) self.SA = SignalsAggregator(sig_dict)
self.sigDict = sigDict self.sig_dict = sig_dict
def get_online_answer(self, signals_ans: dict, price: float) -> dict:
def getOnlineAns(self, signalsAns: dict, price: float) -> dict: """Get trading decision for current market conditions.
probabilityDecsion = self.TV.getDecisionBySignals(self.getSignalsAns(signalsAns))
RMD = self.RM.getDecision(probabilityDecision=probabilityDecsion, price=price, deals = self.DM.deals) Args:
return RMD signals_ans: Dictionary of signal data.
price: Current market price.
def getSignalsAns(self, signalsDataDict: dict) -> dict:
return self.SA.getAns(signalsDataDict) Returns:
Risk-adjusted decision dictionary.
def getRightAns(self,value_1, value_2): """
probability_decision = self.TV.get_decision_by_signals(self.get_signals_answer(signals_ans))
ans='' rmd = self.RM.get_decision(
probability_decision=probability_decision,
price=price,
deals=self.DM.deals
)
return rmd
def get_signals_answer(self, signals_data_dict: dict) -> dict:
"""Get answers from all configured signals.
Args:
signals_data_dict: Dictionary of signal data inputs.
Returns:
Dictionary of signal results.
"""
return self.SA.get_answer(signals_data_dict)
def get_right_answer(self, value_1: float, value_2: float) -> str:
"""Determine correct direction based on value comparison.
Args:
value_1: First value (current).
value_2: Second value (next).
Returns:
Direction: 'down' if value decreases, 'up' if increases, 'none' if same.
"""
if value_1 > value_2: if value_1 > value_2:
ans = 'down' ans = 'down'
elif value_1 < value_2: elif value_1 < value_2:
ans = 'up' ans = 'up'
else: else:
ans = 'none' ans = 'none'
return ans return ans
def getRetroTrendAns(self, retroTemplateDict: dict, data: pd.DataFrame(), window: int) -> list: def get_retro_trend_answer(self, retro_template_dict: dict, data: pd.DataFrame, window: int) -> dict:
"""Run retrospective analysis on historical data.
reqSig={}
Slides a window through historical data to generate training data
for probability matrix generation.
Args:
retro_template_dict: Template defining signal structure.
data: Historical market data.
window: Size of sliding window.
Returns:
Dictionary with 'signalsAns' and 'rightAns' lists.
"""
req_sig = {}
ans = { ans = {
'signalsAns':[], 'signalsAns': [],
'rightAns':[] 'rightAns': []
} }
target = '' target = ''
for k in tqdm(range(data.shape[0]-window-1)):
for i in retroTemplateDict.keys():
reqSig[i] = {'signalData': data[k:k+window], 'indicatorData':{}}
target = self.SA.signals[i].params['target']
for j in retroTemplateDict[i]['indicatorData'].keys():
reqSig[i]['indicatorData'][j] = data[k:k+window]
sigAns = self.getSignalsAns(reqSig)
rightAns = self.getRightAns(data[target][k], data[target][k+1])
ans['signalsAns'].append(sigAns) for k in tqdm(range(data.shape[0] - window - 1)):
ans['rightAns'].append(rightAns) for i in retro_template_dict.keys():
req_sig[i] = {'signalData': data[k:k+window], 'indicatorData': {}}
target = self.SA.signals[i].params['target']
for j in retro_template_dict[i]['indicatorData'].keys():
req_sig[i]['indicatorData'][j] = data[k:k+window]
sig_ans = self.get_signals_answer(req_sig)
right_ans = self.get_right_answer(data[target][k], data[target][k+1])
ans['signalsAns'].append(sig_ans)
ans['rightAns'].append(right_ans)
return ans return ans
def generateMatrixProbabilityFromDict(self, dictSignals: dict) -> dict: def generate_matrix_probability_from_dict(self, dict_signals: dict) -> None:
self.TV.createMatrixAmounts(dictSignals['signalsAns'][0].keys()) """Generate probability matrices from retrospective signal data.
for i in range(len(dictSignals['signalsAns'])):
self.TV.setDecisionBySignals(signalDecisions = dictSignals['signalsAns'][i], Args:
trande = dictSignals['rightAns'][i]) dict_signals: Dictionary containing 'signalsAns' and 'rightAns' from retro analysis.
self.TV.generateMatrixProbability() """
self.TV.create_matrix_amounts(dict_signals['signalsAns'][0].keys())
def createDump(self,postfix='') -> str: for i in range(len(dict_signals['signalsAns'])):
dataDict = { self.TV.set_decision_by_signals(
'RM':self.RM, signal_decisions=dict_signals['signalsAns'][i],
'DM':self.DM, trande=dict_signals['rightAns'][i]
'TV':self.TV, )
'SA':self.SA, self.TV.generate_matrix_probability()
'sigDict':self.sigDict
def create_dump(self, postfix: str = '') -> str:
"""Save decision manager state to pickle file.
Args:
postfix: Optional postfix for filename.
Returns:
Absolute path to saved file.
"""
data_dict = {
'RM': self.RM,
'DM': self.DM,
'TV': self.TV,
'SA': self.SA,
'sigDict': self.sig_dict
} }
fileName='data_'+postfix+'.pickle' file_name = 'data_' + postfix + '.pickle'
with open(fileName, 'wb') as f: with open(file_name, 'wb') as f:
pickle.dump(dataDict, f) pickle.dump(data_dict, f)
return os.path.abspath(fileName) return os.path.abspath(file_name)
def loadDump(self,path: str) -> None: def load_dump(self, path: str) -> None:
"""Load decision manager state from pickle file.
Args:
path: Path to pickle file.
"""
with open(path, 'rb') as f: with open(path, 'rb') as f:
dataDict = pickle.load(f) data_dict = pickle.load(f)
self.RM = dataDict['RM'] self.RM = data_dict['RM']
self.DM = dataDict['DM'] self.DM = data_dict['DM']
self.TV = dataDict['TV'] self.TV = data_dict['TV']
self.SA = dataDict['SA'] self.SA = data_dict['SA']
self.sigDict = dataDict['sigDict'] self.sig_dict = data_dict['sigDict']

View File

@@ -2,7 +2,7 @@ import pandas as pd
import datetime import datetime
import numpy as np import numpy as np
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
class coreIndicator(): class coreIndicator():
@@ -99,12 +99,12 @@ class ind_BB(coreIndicator):
'window':self.options['window'] 'window':self.options['window']
} }
} }
ans['BB']=market_trade.core.CoreTraidMath.CoreMath(self.data,opMA).ans ans['BB']=market_trade.core.CoreTradeMath.CoreMath(self.data,opMA).ans
opSTD={'dataType':'ohcl', opSTD={'dataType':'ohcl',
'action':'findSTD', 'action':'findSTD',
'actionOptions':{'valueType':self.options['valueType'],'window':self.options['window']} 'actionOptions':{'valueType':self.options['valueType'],'window':self.options['window']}
} }
ans['STD']=market_trade.core.CoreTraidMath.CoreMath(self.data,opSTD).ans ans['STD']=market_trade.core.CoreTradeMath.CoreMath(self.data,opSTD).ans
ans['pSTD']=ans['BB']+ans['STD']*self.options['kDev'] ans['pSTD']=ans['BB']+ans['STD']*self.options['kDev']
ans['mSTD']=ans['BB']-ans['STD']*self.options['kDev'] ans['mSTD']=ans['BB']-ans['STD']*self.options['kDev']
ans['x']=np.array(self.data['date'][self.options['window']-1:].to_list()) ans['x']=np.array(self.data['date'][self.options['window']-1:].to_list())

View File

@@ -2,88 +2,158 @@ import pandas as pd
import datetime import datetime
import numpy as np import numpy as np
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
class coreIndicator():
class CoreIndicator():
def __init__(self,options: dict, dataType: str = None, predictType: str = None, name: str = None): """Base class for technical indicators.
This class provides the foundation for implementing various technical
indicators used in trading signal generation.
"""
def __init__(self, options: dict, data_type: str = None, predict_type: str = None, name: str = None):
"""Initialize CoreIndicator with configuration options.
Args:
options: Dictionary containing indicator-specific parameters.
data_type: Type of data to process (e.g., 'ohlc'). Defaults to None.
predict_type: Type of prediction to make (e.g., 'trend'). Defaults to None.
name: Optional identifier. Defaults to None.
"""
self.options = options self.options = options
self.dataType = dataType #ochl self.data_type = data_type # ohlc
self.predictType = predictType #trend self.predict_type = predict_type # trend
def get_answer(self, data: pd.DataFrame):
def getAns(self, data: pd.DataFrame() ): """Get indicator answer from data.
Args:
data: DataFrame containing market data.
Returns:
Calculated indicator values or "ERROR" if not implemented.
"""
return "ERROR" return "ERROR"
class indicatorsAgrigator:
""" class IndicatorsAggregator:
indicators = { """Aggregates and manages multiple indicator instances.
'ind_BB':{
'className':ind_BB, Example usage:
'params':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5} indicators = {
} 'ind_BB': {
} 'className': ind_BB,
dataDic={ 'params': {'MeanType': 'SMA', 'window': 15, 'valueType': 'close', 'kDev': 2.5}
'ind_BB':df_candle[:1000]
}
"""
def __init__ (self,indDict={}):
self.indDict = indDict
self.indInst = {}
self.ans={}
self.createIndicatorsInstance()
def createIndicatorsInstance(self):
for i in self.indDict.keys():
self.indInst[i]=self.indDict[i]['className'](self.indDict[i]['params'])
def getAns(self,dataDict={}):
ans={}
for i in dataDict.keys():
ans[i] = self.indInst[i].getAns(dataDict[i])
return ans
class ind_BB(coreIndicator):
"""
options
MeanType -> SMA
window -> int
valueType -> str: low, high, open, close
kDev -> float
"""
def __init__(self,options: dict,name = None):
super().__init__(
options = options,
dataType = 'ochl',
predictType = 'trend',
name = name
)
def getAns(self, data: pd.DataFrame()):
data=data.reset_index(drop=True)
ans={}
opMA={'dataType':'ohcl',
'action':'findMean',
'actionOptions':{
'MeanType':self.options['MeanType'],
'valueType':self.options['valueType'],
'window':self.options['window']
} }
} }
ans['BB']=market_trade.core.CoreTraidMath.CoreMath(data,opMA).ans data_dict = {
opSTD={'dataType':'ohcl', 'ind_BB': df_candle[:1000]
'action':'findSTD',
'actionOptions':{'valueType':self.options['valueType'],'window':self.options['window']}
} }
ans['STD']=market_trade.core.CoreTraidMath.CoreMath(data,opSTD).ans aggregator = IndicatorsAggregator(indicators)
ans['pSTD']=ans['BB']+ans['STD']*self.options['kDev'] results = aggregator.get_answer(data_dict)
ans['mSTD']=ans['BB']-ans['STD']*self.options['kDev'] """
ans['x']=np.array(data['date'][self.options['window']-1:].to_list())
self.ans= ans def __init__(self, ind_dict=None):
"""Initialize aggregator with indicator dictionary.
Args:
ind_dict: Dictionary mapping indicator names to configurations.
Defaults to empty dict if not provided.
"""
self.ind_dict = ind_dict if ind_dict is not None else {}
self.ind_instances = {}
self.ans = {}
self.create_indicators_instance()
def create_indicators_instance(self):
"""Create instances of all configured indicators."""
for i in self.ind_dict.keys():
self.ind_instances[i] = self.ind_dict[i]['className'](self.ind_dict[i]['params'])
def get_answer(self, data_dict=None):
"""Calculate answers from all indicators.
Args:
data_dict: Dictionary mapping indicator names to their data.
Defaults to empty dict.
Returns:
Dictionary of indicator results.
"""
if data_dict is None:
data_dict = {}
ans = {}
for i in data_dict.keys():
ans[i] = self.ind_instances[i].get_answer(data_dict[i])
return ans
class ind_BB(CoreIndicator):
"""Bollinger Bands indicator implementation.
Calculates Bollinger Bands using moving average and standard deviation.
Required options:
MeanType: Type of moving average (e.g., 'SMA')
window: Period for calculations (int)
valueType: Price type to use ('low', 'high', 'open', 'close')
kDev: Standard deviation multiplier (float)
"""
def __init__(self, options: dict, name=None):
"""Initialize Bollinger Bands indicator.
Args:
options: Configuration parameters dictionary.
name: Optional identifier.
"""
super().__init__(
options=options,
data_type='ohlc',
predict_type='trend',
name=name
)
def get_answer(self, data: pd.DataFrame):
"""Calculate Bollinger Bands values.
Args:
data: DataFrame with OHLC price data.
Returns:
Dictionary containing:
- BB: Middle band (moving average)
- STD: Standard deviation
- pSTD: Upper band (BB + kDev * STD)
- mSTD: Lower band (BB - kDev * STD)
- x: Date array
"""
data = data.reset_index(drop=True)
ans = {}
op_ma = {
'dataType': 'ohcl',
'action': 'findMean',
'actionOptions': {
'MeanType': self.options['MeanType'],
'valueType': self.options['valueType'],
'window': self.options['window']
}
}
ans['BB'] = market_trade.core.CoreTradeMath.CoreMath(data, op_ma).ans
op_std = {
'dataType': 'ohcl',
'action': 'findSTD',
'actionOptions': {
'valueType': self.options['valueType'],
'window': self.options['window']
}
}
ans['STD'] = market_trade.core.CoreTradeMath.CoreMath(data, op_std).ans
ans['pSTD'] = ans['BB'] + ans['STD'] * self.options['kDev']
ans['mSTD'] = ans['BB'] - ans['STD'] * self.options['kDev']
ans['x'] = np.array(data['date'][self.options['window']-1:].to_list())
self.ans = ans
return ans return ans

View File

@@ -3,27 +3,54 @@ import datetime
import numpy as np import numpy as np
import random import random
class riskManager:
class RiskManager:
def __init__(self,commision=0.04): """Manages risk assessment and position sizing for trading decisions.
self.commision = commision
pass Evaluates trading decisions from probability-based signals and applies
def getDecision(self,probabilityDecision, price, deals=None) -> dict: risk management rules including commission calculations and profit targets.
"""
def __init__(self, commission: float = 0.04):
"""Initialize RiskManager with commission rate.
Args:
commission: Commission rate as decimal (default 0.04 = 4%).
"""
self.commission = commission
def get_decision(self, probability_decision: dict, price: float, deals: pd.DataFrame = None) -> dict:
"""Evaluate trading decision with risk management rules.
Args:
probability_decision: Dictionary containing 'trande' direction from TradeVoter.
price: Current market price.
deals: DataFrame of active positions (optional).
Returns:
Dictionary with 'decision' ('buy', 'sell', 'none') and additional fields:
- For 'buy': includes 'amount' field
- For 'sell': includes 'deals' list of position UUIDs to close
"""
ans = {} ans = {}
ans['decision'] = 'none' ans['decision'] = 'none'
if probabilityDecision['trande'] == 'up':
if probability_decision['trande'] == 'up':
ans['decision'] = 'buy' ans['decision'] = 'buy'
ans['amount'] = 1 ans['amount'] = 1
elif probabilityDecision['trande'] == 'none':
elif probability_decision['trande'] == 'none':
ans['decision'] = 'none' ans['decision'] = 'none'
elif probabilityDecision['trande'] == 'down':
for i in range(deals.shape[0]): elif probability_decision['trande'] == 'down':
ans['decision'] = 'None' if deals is not None:
ans['deals'] = [] for i in range(deals.shape[0]):
row = deals.iloc[i] ans['decision'] = 'none'
if row.startPrice < price*pow(1+self.commission,2): ans['deals'] = []
ans['decision'] = 'sell' row = deals.iloc[i]
ans['deals'].append(row.name) # Check if position is profitable after commission
if row.startPrice < price * pow(1 + self.commission, 2):
ans['decision'] = 'sell'
ans['deals'].append(row.name)
return ans return ans

View File

@@ -2,7 +2,7 @@ import pandas as pd
import datetime import datetime
import numpy as np import numpy as np
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
import market_trade.core.CoreDraw import market_trade.core.CoreDraw
from tqdm import tqdm from tqdm import tqdm

View File

@@ -2,111 +2,172 @@ import pandas as pd
import datetime import datetime
import numpy as np import numpy as np
import market_trade.core.CoreTraidMath import market_trade.core.CoreTradeMath
#import market_trade.core.CoreDraw
from tqdm import tqdm from tqdm import tqdm
from market_trade.core.indicators_v2 import * from market_trade.core.indicators_v2 import IndicatorsAggregator, ind_BB
class CoreSignalTrade:
"""Base class for trading signals.
class coreSignalTrande: Provides foundation for generating trading signals based on technical indicators.
"""
def __init__(self, name: str, req: dict, dataType: str):
def __init__(self, name: str, req: dict, data_type: str):
"""Initialize signal generator.
Args:
name: Signal identifier.
req: Configuration dictionary containing params and indicators.
data_type: Type of data to process (e.g., 'ohlc').
"""
self.name = name self.name = name
self.agrigateInds = self.createIndicatorsInstance(req) self.aggregate_indicators = self.create_indicators_instance(req)
self.params = req['params'] self.params = req['params']
self.dataType = dataType self.data_type = data_type
def createIndicatorsInstance(self,req: dict) -> dict:
return indicatorsAgrigator(req['indicators'])
def getIndAns(self, dataDict: dict) -> dict:
return self.agrigateInds.getAns(dataDict)
def getAns(self, data: pd.DataFrame(), indDataDict: dict) -> dict:
return self.getSigAns(data, self.getIndAns(indDataDict))
class sig_BB(coreSignalTrande):
"""
ind keys:
ind_BB
"""
def __init__(self, name: str, req:dict):
super().__init__(name, req, 'ochl')
def getSigAns(self, data: pd.DataFrame(), indAnsDict: dict) -> dict:
lastValue = data[self.params['source']].to_list()[-1] def create_indicators_instance(self, req: dict) -> IndicatorsAggregator:
if lastValue>indAnsDict['ind_BB']['pSTD'][-1]: """Create indicators aggregator from configuration.
ans='down'
elif lastValue<indAnsDict['ind_BB']['mSTD'][-1]: Args:
ans='up' req: Request dictionary containing indicators configuration.
else:
ans='none' Returns:
IndicatorsAggregator instance.
return ans """
return IndicatorsAggregator(req['indicators'])
class signalsAgrigator: def get_indicator_answer(self, data_dict: dict) -> dict:
"""Get answers from all indicators.
Args:
data_dict: Dictionary mapping indicator names to data.
Returns:
Dictionary of indicator results.
"""
return self.aggregate_indicators.get_answer(data_dict)
def get_answer(self, data: pd.DataFrame, ind_data_dict: dict) -> dict:
"""Get signal answer from data and indicator results.
Args:
data: Market data DataFrame.
ind_data_dict: Dictionary of indicator data.
Returns:
Signal answer (direction).
"""
return self.get_signal_answer(data, self.get_indicator_answer(ind_data_dict))
class sig_BB(CoreSignalTrade):
"""Bollinger Bands signal generator.
Generates trading signals based on Bollinger Bands indicator:
- 'up' when price is below lower band
- 'down' when price is above upper band
- 'none' when price is within bands
Required indicator keys:
ind_BB: Bollinger Bands indicator
""" """
sigAgrReq = {
'sig_BB':{ def __init__(self, name: str, req: dict):
'className':sig_BB, """Initialize Bollinger Bands signal.
'params':{'source':'close','target':'close'},
'indicators':{ Args:
'ind_BB':{ name: Signal identifier.
'className':ind_BB, req: Configuration dictionary.
'params':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5} """
} super().__init__(name, req, 'ohlc')
}
}, def get_signal_answer(self, data: pd.DataFrame, ind_ans_dict: dict) -> str:
'sig_BB_2':{ """Calculate signal from Bollinger Bands.
'className':sig_BB,
'params':{'source':'close','target':'close'}, Args:
'indicators':{ data: Market data DataFrame.
'ind_BB':{ ind_ans_dict: Dictionary containing indicator results.
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2} Returns:
} Signal direction: 'up', 'down', or 'none'.
"""
last_value = data[self.params['source']].to_list()[-1]
if last_value > ind_ans_dict['ind_BB']['pSTD'][-1]:
ans = 'down'
elif last_value < ind_ans_dict['ind_BB']['mSTD'][-1]:
ans = 'up'
else:
ans = 'none'
return ans
class SignalsAggregator:
"""Aggregates and manages multiple signal generators.
Example usage:
sig_config = {
'sig_BB': {
'className': sig_BB,
'params': {'source': 'close', 'target': 'close'},
'indicators': {
'ind_BB': {
'className': ind_BB,
'params': {'MeanType': 'SMA', 'window': 15, 'valueType': 'close', 'kDev': 2.5}
}
}
} }
} }
}
sigAgrData = {
'sig_BB':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
},
'sig_BB_2':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
}
}
sig_data = {
'sig_BB': {
'signalData': df_candle[990:1000],
'indicatorData': {'ind_BB': df_candle[:1000]}
}
}
aggregator = SignalsAggregator(sig_config)
results = aggregator.get_answer(sig_data)
""" """
def __init__ (self,req:dict): def __init__(self, req: dict):
self.signals = self.createSignalsInstance(req) """Initialize signals aggregator.
def createSignalsInstance(self, siganlsDict: dict) -> dict: Args:
req: Dictionary mapping signal names to configurations.
"""
self.signals = self.create_signals_instance(req)
def create_signals_instance(self, signals_dict: dict) -> dict:
"""Create instances of all configured signals.
Args:
signals_dict: Dictionary of signal configurations.
Returns:
Dictionary of signal instances.
"""
ans = {} ans = {}
for i in siganlsDict.keys(): for i in signals_dict.keys():
ans[i]=siganlsDict[i]['className'](name = i, req = siganlsDict[i]) ans[i] = signals_dict[i]['className'](name=i, req=signals_dict[i])
return ans return ans
def getAns(self, dataDict: dict) -> dict: def get_answer(self, data_dict: dict) -> dict:
"""Calculate answers from all signals.
Args:
data_dict: Dictionary mapping signal names to their data.
Each entry should contain 'signalData' and 'indicatorData'.
Returns:
Dictionary of signal results.
"""
ans = {} ans = {}
for i in dataDict.keys(): for i in data_dict.keys():
ans[i] = self.signals[i].getAns(data = dataDict[i]['signalData'], ans[i] = self.signals[i].get_answer(
indDataDict = dataDict[i]['indicatorData']) data=data_dict[i]['signalData'],
return ans ind_data_dict=data_dict[i]['indicatorData']
)
return ans

View File

@@ -3,82 +3,72 @@ import datetime
import numpy as np import numpy as np
#import random #import random
class trandeVoter(): class TradeVoter():
def __init__(self,name): def __init__(self, name):
self.name = name # просто имя self.name = name # Instance identifier
self.trandeValuesList = ['up','none','down'] #словарь трегдов self.trade_values_list = ['up', 'none', 'down'] # Valid trade directions
self.matrixAmounts = None # матрица сумм self.matrix_amounts = None # Sum matrix for signal combinations
self.keysMatrixAmounts = None #ключи матрицы сумм, техническое поле self.keys_matrix_amounts = None # Matrix keys, technical field
self.matrixProbability = None # матрица вероятностей self.matrix_probability = None # Probability matrix for decision making
#функция которая создает df с заданным набором колонок и индексов. индексы - уникальные соотношения # Function to create DataFrame with specified columns and indices. Indices are unique combinations.
def createDFbyNames(self, namesIndex, namesColoms,defaultValue=0.0): def create_df_by_names(self, names_index, column_names, default_value=0.0):
df = pd.DataFrame(dict.fromkeys(namesColoms, [defaultValue]*pow(3,len(namesIndex))), df = pd.DataFrame(dict.fromkeys(column_names, [default_value]*pow(3, len(names_index))),
index=pd.MultiIndex.from_product([self.trandeValuesList]*len(namesIndex), names=namesIndex) index=pd.MultiIndex.from_product([self.trade_values_list]*len(names_index), names=names_index)
#,columns=namesColoms
) )
return(df) return df
#создание матрицы сумм с дефолтным значением
def createMatrixAmounts(self,namesIndex: list) -> pd.DataFrame():
self.matrixAmounts = self.createDFbyNames(namesIndex,self.trandeValuesList,0)
self.keysMatrixAmounts = self.matrixAmounts.to_dict('tight')['index_names']
self.createMatrixProbability(namesIndex)
return(self.matrixAmounts)
#создание матрицы вероятностей с дефолтным значением
def createMatrixProbability(self,namesIndex: list) -> pd.DataFrame():
self.matrixProbability = self.createDFbyNames(namesIndex,self.trandeValuesList)
return(self.matrixProbability)
#установка значений в матрицы сумм. signalDecisions - значения индикаторов key:value; trande - реальное значение
def setDecisionBySignals(self,signalDecisions: dict,trande: str) -> None:
buff=[]
for i in self.keysMatrixAmounts:
buff.append(signalDecisions[i])
self.matrixAmounts.loc[tuple(buff),trande] += 1
#заполнение матрицы вероятностей вычисляемыми значениями из матрицы сумм
def generateMatrixProbability(self) -> None:
for i in range(self.matrixAmounts.shape[0]):
print(self.matrixAmounts)
rowSum=sum(self.matrixAmounts.iloc[i]) + 1
self.matrixProbability.iloc[i]['up'] = self.matrixAmounts.iloc[i]['up'] / rowSum
self.matrixProbability.iloc[i]['none'] = self.matrixAmounts.iloc[i]['none'] / rowSum
self.matrixProbability.iloc[i]['down'] = self.matrixAmounts.iloc[i]['down'] / rowSum
#получение рещения из матрицы вероятностей по заданным значениям сигналов # Create sum matrix with default value
def getDecisionBySignals(self,signalDecisions: dict) -> dict: def create_matrix_amounts(self, names_index: list) -> pd.DataFrame:
self.matrix_amounts = self.create_df_by_names(names_index, self.trade_values_list, 0)
self.keys_matrix_amounts = self.matrix_amounts.to_dict('tight')['index_names']
self.create_matrix_probability(names_index)
return self.matrix_amounts
# Create probability matrix with default value
def create_matrix_probability(self, names_index: list) -> pd.DataFrame:
self.matrix_probability = self.create_df_by_names(names_index, self.trade_values_list)
return self.matrix_probability
# Set values in sum matrix. signalDecisions - indicator values key:value; trande - actual value
def set_decision_by_signals(self, signal_decisions: dict, trande: str) -> None:
buff = []
for i in self.keys_matrix_amounts:
buff.append(signal_decisions[i])
self.matrix_amounts.loc[tuple(buff), trande] += 1
# Fill probability matrix with calculated values from sum matrix
def generate_matrix_probability(self) -> None:
for i in range(self.matrix_amounts.shape[0]):
print(self.matrix_amounts)
row_sum = sum(self.matrix_amounts.iloc[i]) + 1
self.matrix_probability.iloc[i]['up'] = self.matrix_amounts.iloc[i]['up'] / row_sum
self.matrix_probability.iloc[i]['none'] = self.matrix_amounts.iloc[i]['none'] / row_sum
self.matrix_probability.iloc[i]['down'] = self.matrix_amounts.iloc[i]['down'] / row_sum
# Get decision from probability matrix based on signal values
def get_decision_by_signals(self, signal_decisions: dict) -> dict:
ans = {} ans = {}
spliceSearch =self.matrixProbability.xs(tuple(signalDecisions.values()), splice_search = self.matrix_probability.xs(tuple(signal_decisions.values()),
level=list(signalDecisions.keys()) level=list(signal_decisions.keys())
) )
ans['probability'] = spliceSearch.to_dict('records')[0] ans['probability'] = splice_search.to_dict('records')[0]
ans['trande'] = spliceSearch.iloc[0].idxmax() ans['trande'] = splice_search.iloc[0].idxmax()
return ans return ans
#получение матриц вероятностей и суммы в видей словарей # Get probability and sum matrices as dictionaries
def getMatrixDict(self) -> dict: def get_matrix_dict(self) -> dict:
ans={} ans = {}
ans['amounts'] = self.matrixAmounts.to_dict('tight') ans['amounts'] = self.matrix_amounts.to_dict('tight')
ans['probability'] = self.matrixProbability.to_dict('tight') ans['probability'] = self.matrix_probability.to_dict('tight')
return ans return ans
#установка матриц вероятностей и суммы в видей словарей
def setMatrixDict(self,matrixDict: dict) -> dict:
if matrixDict['amounts'] != None:
self.matrixAmounts = pd.DataFrame.from_dict(y['amounts'], orient='tight')
if matrixDict['probability'] != None:
self.matrixProbability = pd.DataFrame.from_dict(y['probability'], orient='tight')
# Set probability and sum matrices from dictionaries
def set_matrix_dict(self, matrix_dict: dict) -> dict:
if matrix_dict['amounts'] != None:
self.matrix_amounts = pd.DataFrame.from_dict(y['amounts'], orient='tight')
if matrix_dict['probability'] != None:
self.matrix_probability = pd.DataFrame.from_dict(y['probability'], orient='tight')

View File

@@ -1,63 +1,64 @@
from market_trade.core.decisionManager_v2 import * from market_trade.core.decisionManager_v2 import DecisionManager
from market_trade.core.indicators_v2 import * from market_trade.core.indicators_v2 import ind_BB
from market_trade.core.signals_v2 import * from market_trade.core.signals_v2 import sig_BB
import market_trade.data.dataloader import market_trade.data.dataloader
sigAgrReq = { sig_agr_req = {
'sig_BB':{ 'sig_BB': {
'className':sig_BB, 'className': sig_BB,
'params':{'source':'close','target':'close'}, 'params': {'source': 'close', 'target': 'close'},
'indicators':{ 'indicators': {
'ind_BB':{ 'ind_BB': {
'className':ind_BB, 'className': ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2.5} 'params': {'MeanType': 'SMA', 'window': 30, 'valueType': 'close', 'kDev': 2.5}
} }
} }
}, },
'sig_BB_2':{ 'sig_BB_2': {
'className':sig_BB, 'className': sig_BB,
'params':{'source':'close','target':'close'}, 'params': {'source': 'close', 'target': 'close'},
'indicators':{ 'indicators': {
'ind_BB':{ 'ind_BB': {
'className':ind_BB, 'className': ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2} 'params': {'MeanType': 'SMA', 'window': 30, 'valueType': 'close', 'kDev': 2}
} }
} }
} }
} }
test = decsionManager('Pipa', sigAgrReq) test = DecisionManager('Pipa', sig_agr_req)
import pandas as pd import pandas as pd
df_candle = pd.read_csv("../../data/EURUSD_price_candlestick.csv") df_candle = pd.read_csv("../../data/EURUSD_price_candlestick.csv")
df_candle["date"] = df_candle["timestamp"] df_candle["date"] = df_candle["timestamp"]
sigAgrRetroTemplate = {
'sig_BB':{ sig_agr_retro_template = {
'sig_BB': {
'signalData': None, 'signalData': None,
'indicatorData' :{'ind_BB': None} 'indicatorData': {'ind_BB': None}
}, },
'sig_BB_2':{ 'sig_BB_2': {
'signalData': None, 'signalData': None,
'indicatorData' :{'ind_BB': None} 'indicatorData': {'ind_BB': None}
} }
} }
retroAns = test.getRetroTrendAns(sigAgrRetroTemplate,df_candle[5000:6000].reset_index(drop=True),40) retro_ans = test.get_retro_trend_answer(sig_agr_retro_template, df_candle[5000:6000].reset_index(drop=True), 40)
test.generateMatrixProbabilityFromDict(retroAns) test.generate_matrix_probability_from_dict(retro_ans)
sigAgrData = { sig_agr_data = {
'sig_BB':{ 'sig_BB': {
'signalData': df_candle[990:1000], 'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]} 'indicatorData': {'ind_BB': df_candle[:1000]}
}, },
'sig_BB_2':{ 'sig_BB_2': {
'signalData': df_candle[990:1000], 'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]} 'indicatorData': {'ind_BB': df_candle[:1000]}
} }
} }
test.getOnlineAns(sigAgrData, 0.0) test.get_online_answer(sig_agr_data, 0.0)