moved things to a new place !

This commit is contained in:
2024-03-15 20:15:46 +01:00
parent 5f8cea85ce
commit 8d8d102a14
28 changed files with 3967 additions and 208 deletions

View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python
# coding: utf-8
# In[2]:
import pandas as pd
import datetime
import numpy as np
#import plotly as pl
#import plotly.graph_objs as go
#from plotly.offline import init_notebook_mode, iplot
#from plotly.subplots import make_subplots
#init_notebook_mode()
import CoreTraidMath
import CoreDraw
# In[3]:
class coreIndicator():
def __init__(self,
data=pd.DataFrame(),
options={},
showMode='None',
):
'''
showMode = None/Ind/PartOf
'''
self.data=data
self.showMode=showMode
self.options=options
self.overlayInd=None #True/False
self.ans=None
self.figDict=None
def getAns(self,data=None):
if type(data)!=type(None):
self.data=data
self.ans=self.getCalculate()
if self.showMode=='Ind' or self.showMode=='PartOf':
self.figDict=self.getFigDict()
if self.showMode=='Ind':
self.getFig()
return self.ans
def getFig(self,row=1):
CoreDraw.coreDraw(self.figDict,True)
def getCalculate(self):
return "Error"
def getFigDict(self):
return "Error"
class indicatorAgrigator():
'''
Тема чисто для отладки
jj=indicatorAgrigator().runAll([o1,o2],df_candle[:30])
#jj.createIndFromList([o1,o2])
#jj.calculateInd(df_candle[:30])
'''
def __init__(self):
self.indList=None
self.data=None
def createInd(self,classDict):
return classDict['name'](
options=classDict['params'],
showMode=classDict['showMode']
)
def createIndFromList(self,indList):
self.indList=indList
ans=[]
for i in self.indList:
ans.append(self.createInd(i))
self.indList=ans
return ans
def calculateInd(self,data):
self.data=data
for i in self.indList:
#i.getAns(data)
i.data=self.data
i.ans=i.getCalculate()
i.figDict=i.getFigDict()
#i.getFig()
def agrigateFig(self):
req=[[]]
for i in self.indList:
if i.overlayInd==True:
req[0].append(i)
else:
req.append([i])
CoreDraw.agrigateFig(req,True)
def runAll(self,indList,df,needDraw=False):
self.createIndFromList(indList)
self.calculateInd(df)
if needDraw:
self.agrigateFig()
# In[4]:
class ind_BB(coreIndicator):
def getCalculate(self):
self.overlayInd=True
ans={}
opMA={'dataType':'ohcl',
'action':'findMean',
'actionOptions':{
'MeanType':self.options['MeanType'],
'valueType':self.options['valueType'],
'window':self.options['window']
}
}
ans['BB']=CoreTraidMath.CoreMath(self.data,opMA).ans
opSTD={'dataType':'ohcl',
'action':'findSTD',
'actionOptions':{'valueType':self.options['valueType'],'window':self.options['window']}
}
ans['STD']=CoreTraidMath.CoreMath(self.data,opSTD).ans
ans['pSTD']=ans['BB']+ans['STD']*self.options['kDev']
ans['mSTD']=ans['BB']-ans['STD']*self.options['kDev']
ans['x']=np.array(self.data['date'][self.options['window']-1:].to_list())
return ans
def getFigDict(self,row=1):
req=[]
req.append({
'vtype':'Scatter',
'df':pd.DataFrame(
{'value':self.ans['BB'],'date':self.ans['x']}) ,
'row':row,
'col':1,
'name':'BB'
})
req.append({
'vtype':'Scatter',
'df':pd.DataFrame(
{'value':self.ans['pSTD'],'date':self.ans['x']}) ,
'row':row,
'col':1,
'name':'pSTD'
})
req.append({
'vtype':'Scatter',
'df':pd.DataFrame(
{'value':self.ans['mSTD'],'date':self.ans['x']}) ,
'row':row,
'col':1,
'name':'mSTD'
})
return req
# In[5]:
class ind_OCHL(coreIndicator):
def getCalculate(self):
self.overlayInd=True
def getFigDict(self,row=1):
req=[]
req.append({
'vtype':'OCHL',
'df':self.data,
'row':1,
'col':1,
'name':'OHCL'
})
return req
# In[7]:
df_candle = pd.read_csv("../data/EURUSD_price_candlestick.csv")
df_candle.rename(columns={'timestamp': 'date'}, inplace=True)
df_candle
# In[8]:
o1={
'name':ind_OCHL,
'params':{},
'showMode':'PartOf',
}
o2={
'name':ind_BB,
'params':{'MeanType':'SMA','window':25,'valueType':'low','kDev':2},
'showMode':'PartOf',
}
jj=indicatorAgrigator().runAll([o1,o2],df_candle[:300],True)
#jj.createIndFromList([o1,o2])
#jj.calculateInd(df_candle[:30])
# In[9]:
op={'MeanType':'SMA','window':5,'valueType':'low','kDev':2}
a=ind_BB(df_candle[:100],op,'PartOf')
# In[10]:
a.getAns()
# In[11]:
b=ind_OCHL(df_candle[:30],{},'Ind')
b.getAns(df_candle[:100])
# In[12]:
opc={'MeanType':'SMA','window':20,'valueType':'low','kDev':2}
c=ind_BB(df_candle[:100],opc,'PartOf')
c.getAns()
# In[13]:
hhh = CoreDraw.agrigateFig([[b,a,c]],True)
# In[14]:
import indicators
# In[15]:
op_1={'MeanType':'SMA','window':5,'valueType':'low','kDev':2}
test_1=indicators.ind_BB(df_candle[:100],op)
test_1.getAns()
# In[ ]:

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
import datetime
import numpy as np
import random
# In[2]:
class riskManager:
def __init__(self,commision=0.04):
self.commision = commision
pass
def getDecision(self,signalDecision,probabilityDecision, price, deals=None) -> dict:
ans = {}
if probabilityDecision['trande'] == 'up':
ans['decision'] = 'buy'
ans['amount'] = 1
elif probabilityDecision['trande'] == 'none':
ans['decision'] = 'none'
elif probabilityDecision['trande'] == 'down':
for i in deals.shape[0]:
ans['decision'] = 'None'
ans['deals'] = []
row = deals.iloc[i]
if row.startPrice < price*pow(1+self.commission,2):
ans['decision'] = 'sell'
ans['deals'].append(row.name)
return ans
# In[ ]:

View File

@@ -0,0 +1,390 @@
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
import datetime
import numpy as np
import CoreTraidMath
import CoreDraw
from tqdm import tqdm
from indicators import *
# In[2]:
df_candle = pd.read_csv("../data/EURUSD_price_candlestick.csv")
df_candle.rename(columns={'timestamp': 'date'}, inplace=True)
df_candle
# In[3]:
class coreSignalTrande():
def __init__(self,
data=pd.DataFrame(),
dataType='candel',
mode='online',
batchSize=None,
indParams=None,
signalParams=None,
#needFig=False,
#showOnlyIndex=False,
#drawFig=False,
#equalityGap=0
):
self.data=data.reset_index(drop=True)
self.onlineData=data.reset_index(drop=True)
self.dataType=dataType
self.mode=mode
self.ans=None
self.softAnalizList=np.asarray([])
self.hardAnalizList=np.asarray([])
self.analizMetrics={}
self.indParams=indParams
self.signalParams=signalParams
self.batchSize=batchSize
#self.needFig=needFig
#self.showOnlyIndex=showOnlyIndex
#self.drawFig=drawFig
#self.equalityGap=equalityGap
#Роутер получения ответа
def getAns(self,data):
#ans='Error: unknown Mode!'
ans=None
print("Start processing...")
if self.mode == 'online':
ans=self.getOnlineAns(data.reset_index(drop=True))
elif self.mode == 'retro':
ans=self.getRetroAns(data)
elif self.mode == 'retroFast':
ans=self.getRetroFastAns(data)
print("Processing DONE!")
return ans
#Ретро режим, где расширяется окно добавлением новых элементов
def getRetroAns(self,data):
ans=np.asarray([])
for i in tqdm(range(self.batchSize,len(data)-1)):
#self.onlineData=self.data[0:i]
window_data = data[0:i]
window_data.reset_index(drop=True)
ans=np.append(ans,(self.getOnlineAns(window_data)))
self.ans=ans
self.getAnaliz()
self.getMetrix()
return ans
#Ретро режим, где двигается окно
def getRetroFastAns(self,data):
#print('d - ',data)
ans=np.asarray([])
for i in tqdm(range(len(data)-1-self.batchSize)):
#self.onlineData=self.data[i:i+self.batchSize]
window_data = data[i:i+self.batchSize]
#print('win - ',window_data)
window_data.reset_index(drop=True)
#print('win - ',window_data)
ans=np.append(ans,(self.getOnlineAns(window_data)))
self.ans=ans
self.getAnaliz()
self.getMetrix()
return ans
#Метод, который будет переопределять каждый дочерний класс
def getOnlineAns(self):
return 'Error'
def getAnaliz(self):
print("Start analiz...")
for i in (range(len(self.ans))):
sourceValue=self.data[self.signalParams['source']][i+self.batchSize]
targetValue=self.data[self.signalParams['target']][i+self.batchSize + 1]
if (targetValue)>sourceValue:
if self.ans[i]==1:
self.softAnalizList=np.append(self.softAnalizList,1)
self.hardAnalizList=np.append(self.hardAnalizList,1)
elif self.ans[i]==-1:
self.softAnalizList=np.append(self.softAnalizList,-1)
self.hardAnalizList=np.append(self.hardAnalizList,-1)
else:
self.softAnalizList=np.append(self.softAnalizList,0)
self.hardAnalizList=np.append(self.hardAnalizList,-1)
elif (targetValue)<sourceValue:
if self.ans[i]==1:
self.softAnalizList=np.append(self.softAnalizList,-1)
self.hardAnalizList=np.append(self.hardAnalizList,-1)
elif self.ans[i]==-1:
self.softAnalizList=np.append(self.softAnalizList,1)
self.hardAnalizList=np.append(self.hardAnalizList,1)
else:
self.softAnalizList=np.append(self.softAnalizList,0)
self.hardAnalizList=np.append(self.hardAnalizList,-1)
else:
if self.ans[i]==1:
self.softAnalizList=np.append(self.softAnalizList,-1)
self.hardAnalizList=np.append(self.hardAnalizList,-1)
elif self.ans[i]==-1:
self.softAnalizList=np.append(self.softAnalizList,-1)
self.hardAnalizList=np.append(self.hardAnalizList,-1)
else:
self.softAnalizList=np.append(self.softAnalizList,0)
self.hardAnalizList=np.append(self.hardAnalizList,1)
print("Analiz DONE!")
return 0
def getMeteixDict(self,d):
'''
1 - (сбывшиеся + несбывшиеся) \ (сбывшиеся + несбывшиеся +0)
2 - (сбывшиеся - несбывшиеся) \ (сбывшиеся + несбывшиеся +0)
'''
return {
'1':(d['1'] + d['-1']) / (d['1'] + d['-1'] + d['0']),
'2':(d['1'] - d['-1']) / (d['1'] + d['-1'] + d['0']),
}
def getMetrix(self):
softAnalizCount = {'-1':0,'0':0,'1':0}
hardAnalizCount = {'-1':0,'0':0,'1':0}
for i in range(len(self.softAnalizList)):
softAnalizCount[str(int(self.softAnalizList[i]))]+=1
hardAnalizCount[str(int(self.hardAnalizList[i]))]+=1
self.analizMetrics = {'softAnaliz':self.getMeteixDict(softAnalizCount),
'hardAnaliz':self.getMeteixDict(hardAnalizCount)
}
# In[4]:
class signal_BB(coreSignalTrande):
def __init__(self,
data=pd.DataFrame(),
dataType='candel',
mode='online',
batchSize=None,
indParams=None,
signalParams=None,
):
super().__init__(
data=data,
dataType=dataType,
mode=mode,
batchSize=batchSize,
indParams=indParams,
signalParams=signalParams,
)
if self.indParams == None:
indParams={'MeanType':'SMA','window':15,'valueType':'low','kDev':2}
else:
indParams=self.indParams
self.BB=ind_BB(
data=data,
options=indParams,
)
def getOnlineAns(self,data):
ans=0
#print(data)
self.BB.getAns(data)
#print(BB)
lastValue=data[signalParams['source']].to_list()[-1]
if lastValue>self.BB.ans['pSTD'][-1]:
ans=-1
elif lastValue<self.BB.ans['mSTD'][-1]:
ans=+1
else:
ans=0
return ans
# In[5]:
ind_params={'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5}
signalParams={'source':'close','target':'close'}
b=signal_BB(data=df_candle[:99999],
mode='retroFast',
indParams=ind_params,
signalParams=signalParams,
batchSize=15
)
# In[6]:
a=b.getAns(df_candle[:99900])
# In[7]:
b.analizMetrics
# In[ ]:
# In[ ]:
# In[ ]:
# In[8]:
from signals import *
# In[9]:
class signalAgrigator:
"""
dictAgrigSignal
key - name str
value - dict
className - class
indParams - dict
signalParams - dict
batchSize - int
"""
def __init__(self,
data=pd.DataFrame(),
dictAgrigSignal={},
mode='online',
dataType='candel',
batchSize=None
):
self.createSingnalInstances(
data,
dictAgrigSignal,
dataType,
batchSize
)
self.mode=mode
def createSingnalInstances(
self,
data,
dictAgrigSignal,
dataType,
batchSize
):
ans={}
for i in dictAgrigSignal:
ans[i]=dictAgrigSignal[i]['className'](
data=data,
dataType=dataType,
batchSize=batchSize,
indParams=dictAgrigSignal[i]['indParams'],
signalParams=dictAgrigSignal[i]['signalParams'],
mode=self.mode
)
self.signalsInstances = ans
return ans
def getAns(self, data):
ans={}
if self.mode == 'online':
for i in self.signalsInstances:
ans[i]=(self.signalsInstances[i].getAns(data))
elif self.mode == 'retroFast' or self.mode == 'retro':
for i in self.signalsInstances:
self.signalsInstances[i].getAns(data)
ans[i]=self.signalsInstances[i].analizMetrics
return ans
# In[10]:
reqSig={
'BB1':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':15
},
'BB2':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':20
}
}
# In[11]:
reqSig.values()
# In[12]:
testh=signalAgrigator(df_candle[:99999],reqSig,'online','ohcl',30)
# In[13]:
testh.signalsInstances['BB1'].__dict__
# In[ ]:
testh.getAns(df_candle[:100])
# In[ ]:
testh.signalsInstances['BB1'].__dict__
# In[ ]:
# In[ ]:

View File

@@ -0,0 +1,249 @@
#!/usr/bin/env python
# coding: utf-8
# In[2]:
import pandas as pd
import datetime
import numpy as np
import CoreTraidMath
import CoreDraw
from tqdm import tqdm
from indicators_v2 import *
# In[3]:
df_candle = pd.read_csv(r"../data/EURUSD_price_candlestick.csv")
df_candle.rename(columns={'timestamp': 'date'}, inplace=True)
df_candle
# In[4]:
class coreSignalTrande:
def __init__(self, name: str, req: dict, dataType: str):
self.name = name
self.agrigateInds = self.createIndicatorsInstance(req)
self.params = req['params']
self.dataType = dataType
def createIndicatorsInstance(self,req: dict) -> dict:
return indicatorsAgrigator(req['indicators'])
def getIndAns(self, dataDict: dict) -> dict:
return self.agrigateInds.getAns(dataDict)
def getAns(self, data: pd.DataFrame(), indDataDict: dict) -> dict:
return self.getSigAns(data, self.getIndAns(indDataDict))
class sig_BB(coreSignalTrande):
"""
ind keys:
ind_BB
"""
def __init__(self, name: str, req:dict):
super().__init__(name, req, 'ochl')
def getSigAns(self, data: pd.DataFrame(), indAnsDict: dict) -> dict:
lastValue = data[self.params['source']].to_list()[-1]
if lastValue>indAnsDict['ind_BB']['pSTD'][-1]:
ans='down'
elif lastValue<indAnsDict['ind_BB']['mSTD'][-1]:
ans='up'
else:
ans='none'
return ans
# In[5]:
class signalsAgrigator:
def __init__ (self,req:dict):
self.signals = self.createSignalsInstance(req)
def createSignalsInstance(self, siganlsDict: dict) -> dict:
ans = {}
for i in siganlsDict.keys():
ans[i]=siganlsDict[i]['className'](name = i, req = siganlsDict[i])
return ans
def getAns(self, dataDict: dict) -> dict:
ans = {}
for i in dataDict.keys():
ans[i] = self.signals[i].getAns(data = dataDict[i]['signalData'],
indDataDict = dataDict[i]['indicatorData'])
return ans
# In[ ]:
# In[6]:
sigreq= {
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5}
}
}
}
indReqDict ={'ind_BB':df_candle[:1000]}
# In[7]:
sigAgrReq = {
'sig_BB':{
'className':sig_BB,
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5}
}
}
},
'sig_BB_2':{
'className':sig_BB,
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2}
}
}
}
}
sigAgrData = {
'sig_BB':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
},
'sig_BB_2':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
}
}
# In[ ]:
# In[8]:
ttt=signalsAgrigator(sigAgrReq)
# In[9]:
ttt.__dict__
# In[10]:
ttt.signals['sig_BB'].__dict__
# In[11]:
ttt.getAns(sigAgrData)
# In[ ]:
# In[ ]:
# In[12]:
list({'ttt':2}.keys())[0]
# In[13]:
test = sig_BB('sig_BB', sigreq)
# In[14]:
test.__dict__
# In[ ]:
# In[15]:
test.agrigateInds.__dict__
# In[16]:
ians = test.getIndAns(indReqDict)
ians
# In[17]:
test.getAns(df_candle[:100],indReqDict)
# In[ ]:
# In[ ]:

View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
import datetime
import numpy as np
import random
from signals import * #потом удалить
# In[2]:
class trandeVoter():
def __init__(self,name):
self.name = name # просто имя
self.trandeValuesList = ['up','none','down'] #словарь трегдов
self.matrixAmounts = None # матрица сумм
self.keysMatrixAmounts = None #ключи матрицы сумм, техническое поле
self.matrixProbability = None # матрица вероятностей
#функция которая создает df с заданным набором колонок и индексов. индексы - уникальные соотношения
def createDFbyNames(self, namesIndex, namesColoms,defaultValue=0.0):
df = pd.DataFrame(dict.fromkeys(namesColoms, [defaultValue]*pow(3,len(namesIndex))),
index=pd.MultiIndex.from_product([self.trandeValuesList]*len(namesIndex), names=namesIndex)
#,columns=namesColoms
)
return(df)
#создание матрицы сумм с дефолтным значением
def createMatrixAmounts(self,namesIndex: list) -> pd.DataFrame():
self.matrixAmounts = self.createDFbyNames(namesIndex,self.trandeValuesList,0)
self.keysMatrixAmounts = self.matrixAmounts.to_dict('tight')['index_names']
self.createMatrixProbability(namesIndex)
return(self.matrixAmounts)
#создание матрицы вероятностей с дефолтным значением
def createMatrixProbability(self,namesIndex: list) -> pd.DataFrame():
self.matrixProbability = self.createDFbyNames(namesIndex,self.trandeValuesList)
return(self.matrixProbability)
#установка значений в матрицы сумм. signalDecisions - значения индикаторов key:value; trande - реальное значение
def setDecisionBySignals(self,signalDecisions: dict,trande: str) -> None:
buff=[]
for i in self.keysMatrixAmounts:
buff.append(signalDecisions[i])
self.matrixAmounts.loc[tuple(buff),trande] += 1
#заполнение матрицы вероятностей вычисляемыми значениями из матрицы сумм
def generateMatrixProbability(self) -> None:
for i in range(self.matrixAmounts.shape[0]):
rowSum=sum(self.matrixAmounts.iloc[i])
self.matrixProbability.iloc[i]['up'] = (self.matrixAmounts.iloc[i]['up'] / rowSum)
self.matrixProbability.iloc[i]['none'] = self.matrixAmounts.iloc[i]['none'] / rowSum
self.matrixProbability.iloc[i]['down'] = self.matrixAmounts.iloc[i]['down'] / rowSum
#получение рещения из матрицы вероятностей по заданным значениям сигналов
def getDecisionBySignals(self,signalDecisions: dict) -> dict:
ans = {}
spliceSearch =self.matrixProbability.xs(tuple(signalDecisions.values()),
level=list(signalDecisions.keys())
)
ans['probability'] = spliceSearch.to_dict('records')[0]
ans['trande'] = spliceSearch.iloc[0].idxmax()
return ans
#получение матриц вероятностей и суммы в видей словарей
def getMatrixDict(self) -> dict:
ans={}
ans['amounts'] = self.matrixAmounts.to_dict('tight')
ans['probability'] = self.matrixProbability.to_dict('tight')
return ans
#установка матриц вероятностей и суммы в видей словарей
def setMatrixDict(self,matrixDict: dict) -> dict:
if matrixDict['amounts'] != None:
self.matrixAmounts = pd.DataFrame.from_dict(y['amounts'], orient='tight')
if matrixDict['probability'] != None:
self.matrixProbability = pd.DataFrame.from_dict(y['probability'], orient='tight')
# In[3]:
reqSig={
'BB1':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':15
},
'BB2':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':20
}
}
# In[4]:
reqDS={'BB1':'up','BB2':'none'}
# In[7]:
reqCreate=list(reqSig.keys())
reqCreate
# In[8]:
t=trandeVoter('piu')
o=t.createMatrixAmounts(['BB1', 'BB2'])
o
# In[9]:
for i in range(100000):
t.setDecisionBySignals({'BB1':random.choice(['up','down','none']),
'BB2':random.choice(['up','down','none'])},
random.choice(['up','down','none']))
# In[10]:
t.matrixAmounts
# In[11]:
t.generateMatrixProbability()
# In[577]:
t.matrixProbability
# In[14]:
t.setMatrixDict(y)
# In[15]:
t.getDecisionBySignals(reqDS)
# In[ ]:
# In[ ]:
# In[13]:
y = t.getMatrixDict()
y
# In[16]:
ddf = pd.DataFrame.from_dict(y['amounts'], orient='tight')
ddf
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[17]:
t.matrixProbability.iloc[0]['up'] = (t.matrixProbability.iloc[0]['up'] / (sum(t.matrixProbability.iloc[0])))
t.matrixProbability
# In[ ]:
# In[ ]:
# In[ ]:
# In[18]:
t.matrixProbability['trandе']
# In[19]:
random.choice(['up','down','none'])
# In[20]:
t.setDecisionBySignals(reqDS,'up')
# In[21]:
#t.matrixAmounts.at(bbb,'up')
t.matrixAmounts.iloc[0]
# In[22]:
for i in t.matrixAmounts.iloc[0]:
print (i)
# In[23]:
(t.matrixAmounts.iloc[0]).idxmax()
# In[24]:
t.matrixAmounts
# In[ ]:
# In[25]:
o.xs(('up','down'), level=['BB1','BB2'])['up'].iloc[0]
#oldValue = o.xs(('up','down'), level=['BB1','BB2'])['up']
#o=o.replace(oldValue,oldValue.iloc[0]+1)
#o.xs(('up','down'), level=['BB1','BB2'])
# In[26]:
o.xs(('up','down'), level=['BB1','BB2'], drop_level=False)#.iloc[0].loc['up']=2#.at['up']=4
# In[27]:
o.xs(('up','down'), level=['BB1','BB2']).iloc[0].at['up']
# In[28]:
o.loc['up'].loc['down']
# In[29]:
bbb=tuple(['up','down'])
bbb
# In[30]:
o.loc[bbb,]
# In[31]:
o.at[bbb, 'up']+=1
o
# In[32]:
o.loc[bbb]
# In[33]:
dict(zip(['a','b','c'], [1,2,3]))
# In[ ]:

View File

@@ -0,0 +1,427 @@
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
import datetime
import numpy as np
from signals import * #потом удалить
# In[2]:
class voter_v2():
def __init__(self,name):
self.name=name
pass
def createPredictMatrixBySignals(self,signalsName):
pass
# In[ ]:
# In[3]:
reqSig={
'BB1':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':15
},
'BB2':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':20
}
}
# In[4]:
reqCreate=reqSig.keys()
reqCreate
# In[5]:
class Voter():
def __init__ (self, name=''):
self.name=name
self.mop={
'up': pd.DataFrame(),
'down':pd.DataFrame(),
'none':pd.DataFrame()
}
self.value={}
self.decision=''
self.real_decision=''
self.keys=[]
self.slice_dict={}
def addValue(self, dic_value):
self.value=dic_value
self.checkForNew()
self.setSlice()
self.getDecision()
def checkForNew(self):
if not (list(self.value.keys()) == self.keys):
self.createNewMop(list(self.value.keys()))
def createNewMop(self,missing_indicators):
print('reassembly mop')
new_columns= (missing_indicators)
#new_columns=new_columns.append(['value','p'])
n=len(new_columns)
start_value=-1
variator=3
new_lst=[]
buf_lst=[]
for i in range(n):
buf_lst.append(start_value)
for i in range(pow(variator,n)):
new_lst.append(buf_lst.copy())
for j in range(n):
for i in range(len(new_lst)):
dob_iterator=(i // pow(variator,j)) % variator
new_lst[i][j]=new_lst[i][j] + dob_iterator
#print (new_columns)
self.keys=new_columns
new_columns = new_columns+['amount']+['percentage']
for i in new_lst:
i = i.extend([0,0])
#i = i.append(0)
#print(new_lst)
#print(new_columns)
new_df=pd.DataFrame(new_lst,columns=new_columns)
self.mop['up']=pd.DataFrame.from_dict(new_df.to_dict())
self.mop['down']=pd.DataFrame.from_dict(new_df.to_dict())
self.mop['none']=pd.DataFrame.from_dict(new_df.to_dict())
def setSlice(self):
row_flg=True
self.slice_dict={}
for j in self.mop.keys():
for index, row in self.mop[j].iterrows():
for key, value in self.value.items():
if value != row[key]:
#print('fasle ',key,value,row[key])
row_flg=False
break
if row_flg:
self.slice_dict[j]=dict(row)
#print(j,dict(row))
row_flg=True
def getDecision (self):
max_value=0
for key, value in self.slice_dict.items():
if value['amount'] >= max_value:
max_value = value['amount']
self.decision = key
return self.decision
def setDecision (self,real_decision):
self.real_decision=real_decision
self.updMop()
self.slice_dict[real_decision]['amount']+=1
def updMop(self):
row_flg=True
for index, row in self.mop[self.real_decision].iterrows():
for key, value in self.value.items():
if value != row[key]:
row_flg=False
break
if row_flg:
#self.slice_dict[j]=dict(row)
row['amount']=row['amount']+1
row_flg=True
# In[6]:
test_dic_value_1={'lupa':1 }
test_dic_value_2={'lupa':1 , 'pupa':1}
test_dic_value_3={'lupa':1 , 'pupa':1 , 'zalupa':1 , 'zapupa':1 }
test_dic_value_4={'lupa':1 , 'pupa':1 , 'zalupa':1 , 'zapupa':-1 }
# In[7]:
test=Voter('huita')
test.addValue(test_dic_value_2)
test.decision
test.getDecision()
# In[8]:
test.setDecision('down')
test.getDecision()
# In[9]:
test.slice_dict
# In[ ]:
# In[10]:
import pickle
# In[11]:
dictionary_data = {"a": 1, "b": 2}
a_file = open("data.pkl", "wb")
pickle.dump(dictionary_data, a_file)
a_file.close()
a_file = open("data.pkl", "rb")
output = pickle.load(a_file)
print(output)
a_file.close()
# In[ ]:
# In[ ]:
# In[12]:
arrays = [
["bar", "bar", "baz", "baz", "foo", "foo", "qux", "qux"],
["one", "two", "one", "two", "one", "two", "one", "two"],
]
tuples = list(zip(*arrays))
tuples
# In[13]:
index = pd.MultiIndex.from_tuples(tuples, names=["first", "second"])
# In[14]:
s = pd.DataFrame(np.random.randn(8), index=index)
s
# In[15]:
s.to_dict()
# In[16]:
s.loc(('bar', 'one'))
# In[18]:
iterables = [["up", "down", "none"], ["up", "down", "none"]]
df = pd.DataFrame({'col1': np.random.randn(9),'col2': np.random.randn(9)}, index=pd.MultiIndex.from_product(iterables, names=["first", "second"]))
df
# In[19]:
df.__dict__
# In[ ]:
# In[ ]:
# In[20]:
def createDF(namesIndex, namesColoms):
trandeValuesList = ['up','none','down']
colomsName_lvl = ['trande','amaunt','probability']
#micolumns = pd.MultiIndex.from_tuples(
#[('amaunt', 'up'), ('amaunt', 'none'), ('amaunt', 'down'), ('trande',),('probability',)], names=["lvl0", "lvl1"]
#)
df = pd.DataFrame({
'trande': [None]*pow(3,len(namesIndex)),
'amaunt': [None]*pow(3,len(namesIndex)),
'probability': [None]*pow(3,len(namesIndex))
},
index=pd.MultiIndex.from_product([trandeValuesList]*len(namesIndex), names=namesIndex)
,columns=namesColoms
)
return(df)
# In[21]:
dd=createDF( ['1','2','3'],['trande','amaunt','probability'] )
dd
# In[22]:
df.xs(('up','down'), level=['first','second'])
# In[23]:
dd['trande']
# In[24]:
tvl = ['up','none','down']
colomsName_lvl = ['trande','amaunt','probability']
# In[25]:
tuplesCol = list(zip(['amaunt']*3,tvl))
tuplesCol
# In[26]:
df.loc['up','down']
# In[27]:
df.xs(('up','down'), level=['first','second']).iloc[0]
# In[28]:
df_d=df.to_dict('tight')
df_d
# In[29]:
df_d['index_names']
# In[30]:
ddf = pd.DataFrame.from_dict(df_d, orient='tight')
ddf
# In[31]:
tuple([1,2,3])
# In[32]:
ddf.xs(('up','down'), level=['first','second']).iloc[0]
# In[ ]:

View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
import datetime
import numpy as np
import plotly as pl
import plotly.graph_objs as go
import matplotlib.pyplot as plt
import random
import datetime
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import plotly
import plotly.graph_objs as go
from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots
init_notebook_mode()
#import CoreTraidMath
import plotly.express as px
# In[ ]:
# In[2]:
class agrigateFig():
def __init__(self,data=[],needDraw=False ,subplot_titles=None):
self.data=data
self.ans=self.getAgrPlt()
if needDraw:
self.subplot_titles=subplot_titles
self.fig=coreDraw(self.ans,True,self.subplot_titles)
def getAgrPlt(self):
count=0
ans=[]
for i in self.data:
count=count+1
if type(i)==list:
for g in i:
for j in g.figDict:
ans.append(j)
ans[-1]['row']=count
else:
for j in i.figDict:
ans.append(j)
ans[-1]['row']=count
return ans
# In[3]:
class corePlt():
def __init__(self, params={
'vtype':'',
'df':pd.DataFrame(),
'row':1,
'col':1,
'name':''
}):
self.vtype=params['vtype']
self.df=params['df']
self.row=params['row']
self.col=params['col']
self.name=params['name']
if 'colorType' in params.keys():
self.colorType=params['colorType']
class coreDraw():
def __init__(self, data=[],needShow=False,subplot_titles={}):
self.data=self.getPlts(data)
self.needShow=needShow
self.subplot_titles=subplot_titles
self.ans=self.getAns()
def getBarColorList(self,l,colorType):
if colorType=='diffAbs':
ans=['green']
for i in range(1,len(l)):
if abs(l[i])>abs(l[i-1]):
ans.append('green')
else:
ans.append('red')
elif colorType=='diff':
ans=['green']
for i in range(1,len(l)):
if (l[i])>(l[i-1]):
ans.append('green')
else:
ans.append('red')
elif colorType=='normal':
ans=[]
for i in range(len(l)):
ans.append('gray')
return ans
def getPlts(self, data):
ans=None
if type(data)==list:
ans=[]
for i in data:
ans.append(corePlt(i))
else:
ans=[corePlt(data)]
return ans
def getAns(self):
'''
data list
vtype
df
row=1
col=1
name
'''
ans=None
maxRow=1
maxCol=1
for i in self.data:
if i.row > maxRow:
maxRow =i.row
if i.col > maxCol:
maxCol =i.col
fig = make_subplots(
rows=maxRow,
cols=maxCol,
shared_xaxes=True,
vertical_spacing=0.1,
shared_yaxes=True,
#horizontal_spacing=0.02,
#column_widths=[]
subplot_titles=self.subplot_titles
)
fig.update_layout(xaxis_rangeslider_visible=False)
fig.update_layout(barmode='relative')
for i in self.data:
if i.vtype=='Scatter':
fig.add_trace(go.Scatter(x=i.df['date'],y=i.df['value'],name=i.name), row=i.row, col=i.col)
elif i.vtype=='OCHL':
fig.add_trace(go.Candlestick(
x=i.df['date'],
open=i.df['open'],
high=i.df['high'],
low=i.df['low'],
close=i.df['close'],
name=i.name),
row=i.row, col=i.col
)
elif i.vtype=='Bars':
for j in i.df.keys():
if j!='date':
try:
colorType=i.colorType
except:
colorType='normal'
colors=self.getBarColorList(i.df[j],colorType)
fig.add_trace(go.Bar(x=i.df['date'], y=i.df[j],name=j,marker_color=colors),row=i.row, col=i.col)
ans=fig
if self.needShow:
plotly.offline.iplot(fig)
return ans
# In[ ]:

View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
import datetime
import numpy as np
import uuid
# In[2]:
class DealManager():
def __init__(self):
self.commission=0.04
self.columns=['uuid','figi','amount','startPrice','profit']
self.deals = pd.DataFrame(columns=self.columns)
self.deals = self.deals.set_index('uuid')
def findDealByPriceAndFig(self,price,figi):
ans=None
for i in range(self.deals.shape[0]):
if self.deals.iloc[i].startPrice == price and self.deals.iloc[i].figi == figi:
ans = self.deals.iloc[i].name
break
return ans
def openDeal(self,figi,startPrice,amount=1):
desiredDeal=self.findDealByPriceAndFig(startPrice,figi)
if desiredDeal == None:
newDealDict={
'uuid':[str(uuid.uuid4())],
'figi':[figi],
'startPrice':[startPrice],
'amount':[amount]
}
#newDealDict['profit']=[startPrice*pow(1+self.commission,2)]
newDeal=pd.DataFrame.from_dict(newDealDict).set_index('uuid')
self.deals=pd.concat([self.deals, newDeal])
else:
self.deals.at[desiredDeal,'amount'] += amount
def closeDeal(self,uuid,amount):
desiredDeal=self.deals.loc[uuid]
if desiredDeal.amount - amount == 0:
self.deals = self.deals.drop(labels = [uuid],axis = 0)
else:
self.deals.at[uuid,'amount'] -= amount
#self.deals.loc[uuid].amount = desiredDeal.amount - amount
# In[3]:
t=DealManager()
t.__dict__
# In[ ]:
# In[4]:
t.deals.shape[0]
# In[5]:
t.openDeal('huigi',100,1)
t.openDeal('huigi',100,3)
t.openDeal('huigi1',100,3)
t.openDeal('huigi1',200,3)
# In[6]:
t.deals
# In[7]:
t.deals[t.deals.figi == 'huigi1']
# In[ ]:
# In[8]:
for i in range(t.deals.shape[0]):
print(t.deals.iloc[i])
# In[9]:
t.findDealByPriceAndFig
# In[10]:
t.closeDeal('78228979-3daf-470a-9c2a-8db180c8c3b0',1)
t.deals
# In[11]:
t.deals.iloc[0].name
# In[12]:
a=2
a==None
# In[ ]:

View File

@@ -0,0 +1,358 @@
#!/usr/bin/env python
# coding: utf-8
# In[4]:
import pandas as pd
import datetime
import numpy as np
import pickle
from signals import *
from dealManager import *
from trandeVoter import *
from riskManager import riskManager
# In[5]:
df_candle = pd.read_csv("../data/EURUSD_price_candlestick.csv")
df_candle.rename(columns={'timestamp': 'date'}, inplace=True)
df_candle
# In[6]:
class decsionManager():
def __init__(self,name):
self.name = name
self.RM = riskManager()
self.DM = DealManager()
self.TV = trandeVoter(name)
self.SA = signalAgrigator()
pass
#вытащенный из signalAgrigator метод теста для сигналов
def getSignalTest(self,data: pd.DataFrame(),reqSig: dict, batchSize=30, dataType='candel') -> dict:
self.SA.mode = 'retroFast'
t.SA.createSingnalInstances(
data = data,
dictAgrigSignal = reqSig,
dataType='candel',
batchSize=30
)
ans = t.SA.getAns(data)
return ans
#метод для генерации матрицы вероятностей.
def generateMatrixProbability(self,
data: pd.DataFrame(),
reqSig: dict,
target: str,
batchSize=30,
#dataType='candel'
):
data=data.reset_index(drop=True)
t.SA.createSingnalInstances(
data = data,
dictAgrigSignal = reqSig,
dataType='candel',
batchSize=batchSize
)
self.TV.createMatrixAmounts(reqSig.keys())
for i in range(data.shape[0]-batchSize-1):
sigAns=self.SA.getAns(data[i:i+batchSize])
rightAns=self.getRetroStepAns(data[target][i],data[target][i+1])
self.TV.setDecisionBySignals(self.KostilEbaniy(sigAns),rightAns)
self.TV.generateMatrixProbability()
#без коментариев блять
def KostilEbaniy(self,d):
ans={}
for i in d.keys():
if d[i] == 0:
ans[i] = 'none'
elif d[i] == 1:
ans[i] = 'up'
elif d[i] == -1:
ans[i] = 'down'
return ans
#тож понятная хуита
def getRetroStepAns(self, value1,value2):
if value1 == value2:
ans = 'none'
elif value1 < value2:
ans = 'up'
else:
ans = 'down'
return ans
#метод для онлай получения решения по сигналу
def getSignal(self,data: pd.DataFrame(),reqSig: dict, dataType='candel') -> dict:
data=data.reset_index(drop=True)
self.SA.mode = 'online'
t.SA.createSingnalInstances(
data = data,
dictAgrigSignal = reqSig,
dataType='candel',
batchSize=30
)
ans = t.SA.getAns(data)
return ans
#Создание сигналов. Вызывать перед getOnlineAns
def crateSignals(self,data: pd.DataFrame(),reqSig: dict, dataType='candel'):
data=data.reset_index(drop=True)
self.SA.mode = 'online'
t.SA.createSingnalInstances(
data = data,
dictAgrigSignal = reqSig,
dataType='candel',
batchSize=30
)
def getOnlineAns(self,data: pd.DataFrame(),price):
sigAns = self.SA.getAns(data)
prob = self.TV.getDecisionBySignals(sigAns)
ans = self.RM.getDecision(sigAns,prob,price)
return ans
# In[ ]:
# In[7]:
t= decsionManager('TEST')
# In[8]:
t.__dict__
# In[9]:
reqSig={
'BB1':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':15
},
'BB2':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':20
}
}
# In[10]:
reqSig.keys()
# In[11]:
t.SA.__dict__
# In[12]:
t.generateMatrixProbability(df_candle[:10000],reqSig,'close',40)
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[13]:
mop = t.TV.matrixProbability
mop
# In[ ]:
# In[ ]:
# In[14]:
t.getSignal(df_candle[:10000],reqSig)
# In[15]:
t.getSignalTest(df_candle[:10000],reqSig,40)
# In[ ]:
# In[16]:
t.SA.createSingnalInstances(
data = df_candle[:10000],
dictAgrigSignal = reqSig,
dataType='candel',
batchSize=30
)
# In[ ]:
# In[ ]:
# In[ ]:
# In[17]:
reqSig={
'BB1':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':15
},
'BB2':{
'className':signal_BB,
'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5},
'signalParams':{'source':'close','target':'close'},
'batchSize':20
}
}
# In[18]:
t=decsionManager(reqSig)
# In[ ]:
# In[ ]:
import pickle
# In[ ]:
dictionary_data = {"a": 1, "b": 2}
a_file = open("data.pkl", "wb")
pickle.dump(reqSig, a_file)
a_file.close()
a_file = open("data.pkl", "rb")
output = pickle.load(a_file)
print(output)
a_file.close()
# In[ ]:

View File

@@ -0,0 +1,382 @@
#!/usr/bin/env python
# coding: utf-8
# In[5]:
import os
import pandas as pd
import datetime
import numpy as np
from tqdm import tqdm
from indicators_v2 import *
from signals_v2 import *
from dealManager import *
from trandeVoter import *
from riskManager import *
import pickle
# In[6]:
df_candle = pd.read_csv("../data/EURUSD_price_candlestick.csv")
df_candle.rename(columns={'timestamp': 'date'}, inplace=True)
df_candle
# In[7]:
df_candle['close']
# In[8]:
class decsionManager:
'''
sigAgrReq = {
'sig_BB':{
'className':sig_BB,
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2.5}
}
}
},
'sig_BB_2':{
'className':sig_BB,
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2}
}
}
}
}
sigAgrData = {
'sig_BB':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
},
'sig_BB_2':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
}
}
sigAgrRetroTemplate = {
'sig_BB':{
'signalData': None,
'indicatorData' :{'ind_BB': None}
},
'sig_BB_2':{
'signalData': None,
'indicatorData' :{'ind_BB': None}
}
}
'''
def __init__(self,name, sigDict: dict):
self.RM = riskManager()
self.DM = DealManager()
self.TV = trandeVoter(name)
self.SA = signalsAgrigator(sigDict)
self.sigDict = sigDict
def getOnlineAns(self, signalsAns: dict, price: float) -> dict:
probabilityDecsion = self.TV.getDecisionBySignals(self.getSignalsAns(signalsAns))
RMD = self.RM.getDecision(probabilityDecision=probabilityDecsion, price=price, deals = self.DM.deals)
return RMD
def getSignalsAns(self, signalsDataDict: dict) -> dict:
return self.SA.getAns(signalsDataDict)
def getRightAns(self,value_1, value_2):
ans=''
if value_1 > value_2:
ans = 'down'
elif value_1 < value_2:
ans = 'up'
else:
ans = 'none'
return ans
def getRetroTrendAns(self, retroTemplateDict: dict, data: pd.DataFrame(), window: int) -> list:
reqSig={}
ans = {
'signalsAns':[],
'rightAns':[]
}
target = ''
for k in tqdm(range(data.shape[0]-window-1)):
for i in retroTemplateDict.keys():
reqSig[i] = {'signalData': data[k:k+window], 'indicatorData':{}}
target = self.SA.signals[i].params['target']
for j in retroTemplateDict[i]['indicatorData'].keys():
reqSig[i]['indicatorData'][j] = data[k:k+window]
sigAns = self.getSignalsAns(reqSig)
rightAns = self.getRightAns(data[target][k], data[target][k+1])
ans['signalsAns'].append(sigAns)
ans['rightAns'].append(rightAns)
return ans
def generateMatrixProbabilityFromDict(self, dictSignals: dict) -> dict:
self.TV.createMatrixAmounts(dictSignals['signalsAns'][0].keys())
for i in range(len(dictSignals['signalsAns'])):
self.TV.setDecisionBySignals(signalDecisions = dictSignals['signalsAns'][i],
trande = dictSignals['rightAns'][i])
self.TV.generateMatrixProbability()
def createDump(self,postfix='') -> str:
dataDict = {
'RM':self.RM,
'DM':self.DM,
'TV':self.TV,
'SA':self.SA,
'sigDict':self.sigDict
}
fileName='data_'+postfix+'.pickle'
with open(fileName, 'wb') as f:
pickle.dump(dataDict, f)
return os.path.abspath(fileName)
def loadDump(self,path: str) -> None:
with open(path, 'rb') as f:
dataDict = pickle.load(f)
self.RM = dataDict['RM']
self.DM = dataDict['DM']
self.TV = dataDict['TV']
self.SA = dataDict['SA']
self.sigDict = dataDict['sigDict']
# In[9]:
sigAgrReq = {
'sig_BB':{
'className':sig_BB,
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2.5}
}
}
},
'sig_BB_2':{
'className':sig_BB,
'params':{'source':'close','target':'close'},
'indicators':{
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':30,'valueType':'close','kDev':2}
}
}
}
}
sigAgrData = {
'sig_BB':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
},
'sig_BB_2':{
'signalData': df_candle[990:1000],
'indicatorData' :{'ind_BB': df_candle[:1000]}
}
}
sigAgrRetroTemplate = {
'sig_BB':{
'signalData': None,
'indicatorData' :{'ind_BB': None}
},
'sig_BB_2':{
'signalData': None,
'indicatorData' :{'ind_BB': None}
}
}
# In[10]:
test = decsionManager('Pipa', sigAgrReq)
# In[11]:
test.__dict__
# In[12]:
test.TV.__dict__
# In[13]:
test.SA.signals['sig_BB'].params['target']
# In[14]:
test.getSignalsAns(sigAgrData)
# In[15]:
#test.loadDump('C:\\Users\\Redsandy\\PyProj\\Trade\\MVP\\data_pupa.pickle')
# In[16]:
uuu = test.getRetroTrendAns(sigAgrRetroTemplate,df_candle[:5000],40)
uuu
# In[17]:
test.generateMatrixProbabilityFromDict(uuu)
# In[18]:
test.TV.__dict__
# In[19]:
test.getOnlineAns(sigAgrData, 0.0)
# In[20]:
(test.DM.deals).shape
# In[21]:
test.createDump('pupa')
# In[ ]:
# In[22]:
with open('C:\\Users\\Redsandy\\PyProj\\Trade\\MVP\\data_pupa.pickle', 'rb') as f:
data_new = pickle.load(f)
data_new
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:

View File

@@ -0,0 +1,194 @@
#!/usr/bin/env python
# coding: utf-8
# In[8]:
import pandas as pd
import datetime
import numpy as np
import CoreTraidMath
# In[9]:
df_candle = pd.read_csv(r"../data/EURUSD_price_candlestick.csv")
df_candle.rename(columns={'timestamp': 'date'}, inplace=True)
df_candle
# In[10]:
class coreIndicator():
def __init__(self,options: dict, dataType: str = None, predictType: str = None, name: str = None):
self.options = options
self.dataType = dataType #ochl
self.predictType = predictType #trend
def getAns(self, data: pd.DataFrame() ):
return "ERROR"
# In[11]:
class ind_BB(coreIndicator):
"""
options
MeanType -> SMA
window -> int
valueType -> str: low, high, open, close
kDev -> float
"""
def __init__(self,options: dict,name = None):
super().__init__(
options = options,
dataType = 'ochl',
predictType = 'trend',
name = name
)
def getAns(self, data: pd.DataFrame()):
data=data.reset_index(drop=True)
ans={}
opMA={'dataType':'ohcl',
'action':'findMean',
'actionOptions':{
'MeanType':self.options['MeanType'],
'valueType':self.options['valueType'],
'window':self.options['window']
}
}
ans['BB']=CoreTraidMath.CoreMath(data,opMA).ans
opSTD={'dataType':'ohcl',
'action':'findSTD',
'actionOptions':{'valueType':self.options['valueType'],'window':self.options['window']}
}
ans['STD']=CoreTraidMath.CoreMath(data,opSTD).ans
ans['pSTD']=ans['BB']+ans['STD']*self.options['kDev']
ans['mSTD']=ans['BB']-ans['STD']*self.options['kDev']
ans['x']=np.array(data['date'][self.options['window']-1:].to_list())
self.ans= ans
return ans
# In[12]:
class indicatorsAgrigator:
def __init__ (self,indDict={}):
self.indDict = indDict
self.indInst = {}
self.ans={}
self.createIndicatorsInstance()
def createIndicatorsInstance(self):
for i in self.indDict.keys():
self.indInst[i]=self.indDict[i]['className'](self.indDict[i]['params'])
def getAns(self,dataDict={}):
ans={}
for i in dataDict.keys():
ans[i] = self.indInst[i].getAns(dataDict[i])
return ans
# In[13]:
indicators = {
'ind_BB':{
'className':ind_BB,
'params':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5}
}
}
dataDic={
'ind_BB':df_candle[:1000]
}
# In[ ]:
# In[14]:
ia= indicatorsAgrigator(indicators)
# In[15]:
ia.__dict__
# In[16]:
ia.indInst['ind_BB'].__dict__
# In[17]:
ia.getAns(dataDict=dataDic)
# In[ ]:
# In[ ]:
# In[18]:
op = {'MeanType':'SMA','window':5,'valueType':'low','kDev':2}
# In[19]:
t = ind_BB(op)
# In[20]:
t.getAns(df_candle[:100])
# In[21]:
t.__dict__
# In[ ]:
# In[ ]: