diff --git a/smartmoneyconcepts/smc.py b/smartmoneyconcepts/smc.py index 1f49501..72cbbb5 100644 --- a/smartmoneyconcepts/smc.py +++ b/smartmoneyconcepts/smc.py @@ -473,9 +473,12 @@ def ob( ob[obIndex] = 1 top_arr[obIndex] = obTop bottom_arr[obIndex] = obBtm - obVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] + _volume[close_index - 2] - lowVolume[obIndex] = _volume[close_index - 2] - highVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] + vol_cur = _volume[close_index] + vol_prev1 = _volume[close_index - 1] if close_index >= 1 else 0.0 + vol_prev2 = _volume[close_index - 2] if close_index >= 2 else 0.0 + obVolume[obIndex] = vol_cur + vol_prev1 + vol_prev2 + lowVolume[obIndex] = vol_prev2 + highVolume[obIndex] = vol_cur + vol_prev1 max_vol = max(highVolume[obIndex], lowVolume[obIndex]) percentage[obIndex] = (min(highVolume[obIndex], lowVolume[obIndex]) / max_vol * 100.0) if max_vol != 0 else 100.0 active_bullish.append(obIndex) @@ -529,9 +532,12 @@ def ob( ob[obIndex] = -1 top_arr[obIndex] = obTop bottom_arr[obIndex] = obBtm - obVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] + _volume[close_index - 2] - lowVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] - highVolume[obIndex] = _volume[close_index - 2] + vol_cur = _volume[close_index] + vol_prev1 = _volume[close_index - 1] if close_index >= 1 else 0.0 + vol_prev2 = _volume[close_index - 2] if close_index >= 2 else 0.0 + obVolume[obIndex] = vol_cur + vol_prev1 + vol_prev2 + lowVolume[obIndex] = vol_cur + vol_prev1 + highVolume[obIndex] = vol_prev2 max_vol = max(highVolume[obIndex], lowVolume[obIndex]) percentage[obIndex] = (min(highVolume[obIndex], lowVolume[obIndex]) / max_vol * 100.0) if max_vol != 0 else 100.0 active_bearish.append(obIndex) diff --git a/tests/unit_tests.py b/tests/unit_tests.py index 6684eda..464540b 100644 --- a/tests/unit_tests.py +++ b/tests/unit_tests.py @@ -6,13 +6,15 @@ import pandas as pd import unittest -sys.path.append(os.path.abspath("../")) +BASE_DIR = os.path.dirname(__file__) +sys.path.append(os.path.abspath(os.path.join(BASE_DIR, ".."))) from smartmoneyconcepts.smc import smc # define and import test data test_instrument = "EURUSD" instrument_data = f"{test_instrument}_15M.csv" -df = pd.read_csv(os.path.join("test_data", test_instrument, instrument_data)) +TEST_DATA_DIR = os.path.join(BASE_DIR, "test_data", test_instrument) +df = pd.read_csv(os.path.join(TEST_DATA_DIR, instrument_data)) df = df.set_index("Date") df.index = pd.to_datetime(df.index) @@ -24,7 +26,7 @@ def test_fvg(self): start_time = time.time() fvg_data = smc.fvg(df) fvg_result_data = pd.read_csv( - os.path.join("test_data", test_instrument, "fvg_result_data.csv") + os.path.join(TEST_DATA_DIR, "fvg_result_data.csv") ) print("fvg test time: ", time.time() - start_time) pd.testing.assert_frame_equal(fvg_data, fvg_result_data, check_dtype=False) @@ -33,7 +35,7 @@ def test_fvg_consecutive(self): start_time = time.time() fvg_data = smc.fvg(df, join_consecutive=True) fvg_consecutive_result_data = pd.read_csv( - os.path.join("test_data", test_instrument, "fvg_consecutive_result_data.csv") + os.path.join(TEST_DATA_DIR, "fvg_consecutive_result_data.csv") ) print("fvg consecutive test time: ", time.time() - start_time) pd.testing.assert_frame_equal(fvg_data, fvg_consecutive_result_data, check_dtype=False) @@ -42,9 +44,7 @@ def test_swing_highs_lows(self): start_time = time.time() swing_highs_lows_data = smc.swing_highs_lows(df, swing_length=5) swing_highs_lows_result_data = pd.read_csv( - os.path.join( - "test_data", test_instrument, "swing_highs_lows_result_data.csv" - ) + os.path.join(TEST_DATA_DIR, "swing_highs_lows_result_data.csv") ) print("swing_highs_lows test time: ", time.time() - start_time) pd.testing.assert_frame_equal(swing_highs_lows_data, swing_highs_lows_result_data, check_dtype=False) @@ -54,7 +54,7 @@ def test_bos_choch(self): swing_highs_lows_data = smc.swing_highs_lows(df, swing_length=5) bos_choch_data = smc.bos_choch(df, swing_highs_lows_data) bos_choch_result_data = pd.read_csv( - os.path.join("test_data", test_instrument, "bos_choch_result_data.csv") + os.path.join(TEST_DATA_DIR, "bos_choch_result_data.csv") ) print("bos_choch test time: ", time.time() - start_time) pd.testing.assert_frame_equal( @@ -66,17 +66,32 @@ def test_ob(self): swing_highs_lows_data = smc.swing_highs_lows(df, swing_length=5) ob_data = smc.ob(df, swing_highs_lows_data) ob_result_data = pd.read_csv( - os.path.join("test_data", test_instrument, "ob_result_data.csv") + os.path.join(TEST_DATA_DIR, "ob_result_data.csv") ) print("ob test time: ", time.time() - start_time) pd.testing.assert_frame_equal(ob_data, ob_result_data, check_dtype=False) + def test_ob_early_data(self): + """Ensure early candles do not cause index errors in OB calculation.""" + short_df = pd.DataFrame( + { + "open": [1.0, 1.1, 1.2], + "high": [1.05, 1.15, 1.25], + "low": [0.95, 1.05, 1.15], + "close": [1.02, 1.14, 1.24], + "volume": [5, 6, 7], + } + ) + swing = smc.swing_highs_lows(short_df, swing_length=1) + ob_df = smc.ob(short_df, swing) + self.assertEqual(len(ob_df), len(short_df)) + def test_liquidity(self): start_time = time.time() swing_highs_lows_data = smc.swing_highs_lows(df, swing_length=5) liquidity_data = smc.liquidity(df, swing_highs_lows_data) liquidity_result_data = pd.read_csv( - os.path.join("test_data", test_instrument, "liquidity_result_data.csv") + os.path.join(TEST_DATA_DIR, "liquidity_result_data.csv") ) print("liquidity test time: ", time.time() - start_time) pd.testing.assert_frame_equal(liquidity_data, liquidity_result_data, check_dtype=False) @@ -86,9 +101,7 @@ def test_previous_high_low(self): start_time = time.time() previous_high_low_data = smc.previous_high_low(df, time_frame="4h") previous_high_low_result_data = pd.read_csv( - os.path.join( - "test_data", test_instrument, "previous_high_low_result_data_4h.csv" - ) + os.path.join(TEST_DATA_DIR, "previous_high_low_result_data_4h.csv") ) print("previous_high_low test time: ", time.time() - start_time) pd.testing.assert_frame_equal(previous_high_low_data, previous_high_low_result_data, check_dtype=False) @@ -97,9 +110,7 @@ def test_previous_high_low(self): start_time = time.time() previous_high_low_data = smc.previous_high_low(df, time_frame="1D") previous_high_low_result_data = pd.read_csv( - os.path.join( - "test_data", test_instrument, "previous_high_low_result_data_1D.csv" - ) + os.path.join(TEST_DATA_DIR, "previous_high_low_result_data_1D.csv") ) print("previous_high_low test time: ", time.time() - start_time) pd.testing.assert_frame_equal(previous_high_low_data, previous_high_low_result_data, check_dtype=False) @@ -108,9 +119,7 @@ def test_previous_high_low(self): start_time = time.time() previous_high_low_data = smc.previous_high_low(df, time_frame="W") previous_high_low_result_data = pd.read_csv( - os.path.join( - "test_data", test_instrument, "previous_high_low_result_data_W.csv" - ) + os.path.join(TEST_DATA_DIR, "previous_high_low_result_data_W.csv") ) print("previous_high_low test time: ", time.time() - start_time) pd.testing.assert_frame_equal(previous_high_low_data, previous_high_low_result_data, check_dtype=False) @@ -119,7 +128,7 @@ def test_sessions(self): start_time = time.time() sessions = smc.sessions(df, session="London") sessions_result_data = pd.read_csv( - os.path.join("test_data", test_instrument, "sessions_result_data.csv") + os.path.join(TEST_DATA_DIR, "sessions_result_data.csv") ) print("sessions test time: ", time.time() - start_time) pd.testing.assert_frame_equal(sessions, sessions_result_data, check_dtype=False) @@ -129,7 +138,7 @@ def test_retracements(self): swing_highs_lows_data = smc.swing_highs_lows(df, swing_length=5) retracements = smc.retracements(df, swing_highs_lows_data) retracements_result_data = pd.read_csv( - os.path.join("test_data", test_instrument, "retracements_result_data.csv") + os.path.join(TEST_DATA_DIR, "retracements_result_data.csv") ) print("retracements test time: ", time.time() - start_time) pd.testing.assert_frame_equal(retracements, retracements_result_data, check_dtype=False)