功能:
1、開啟之后,7*24自動運行。
2、在共享內存中存放當個交易日的tick數據,方便隨時取用。
3、支持多行情源取數據。經過測試一個行情源峰值帶寬要求為20M,所以使用時要配合帶寬限制。
4、夜盤結束時輸出一下tick數據,白盤結束時輸出所有tick。
5、支持查詢指令:
運行時如下:
貼上代碼:
// FutureDataReceive.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" #include "ThostFtdcMdSpiImpl.hpp" #include "ThostFtdcTraderSpiImpl.hpp" #include "future_helper.h" #include "TinyConfig.h" #include "FileMap.h" #include <boost/lockfree/queue.hpp> #include <iostream> #include <conio.h> CFileMap g_fileMap; TinyConfig g_config; CThostFtdcTraderSpiImpl g_trade; vector<shared_ptr<CThostFtdcMdSpiImpl>> g_vecQuote; int g_nPreTradingDay; int g_nTradingDay; bool g_bThread = true; boost::lockfree::queue<CThostFtdcDepthMarketDataField> g_buffer(1000); shared_ptr<thread> g_thdDealData = nullptr; vector<CThostFtdcDepthMarketDataField> g_vecBuffer; void OnLog(const char* p) { printf(p); ofstream ofs_log("./log.data.receive.txt", ios_base::app); ofs_log << future_helper::get_local_datetime().c_str() << " " << p; ofs_log.close(); } #pragma region trade module ///trade call back bool g_bTradeLoginIn = false; void OnTradeComplete() { int nTradingDay = atoi(g_trade.GetTradingDay()); if (g_nTradingDay != 0 && g_nTradingDay != nTradingDay) g_nPreTradingDay = g_nTradingDay; g_nTradingDay = nTradingDay; g_bTradeLoginIn = true; printf("交易日:%d-%d\n", g_nPreTradingDay, g_nTradingDay); } void OnTradeRspMessage(const char* from, const char* msg, future_msg_type msg_type) { OnLog(future_helper::format_string("%s%s:%s\n", msg_type == future_msg_type::CTP_SUCCESS ? "" : "*", from, msg).c_str()); } void TradeLogin() { int retry_time = 3; while (1) { g_bTradeLoginIn = false; g_trade.SetCallBackFunc(OnTradeComplete, nullptr, OnTradeRspMessage); g_trade.SetAuthenticate(g_config.GetValue("Trade", "product").c_str(), g_config.GetValue("Trade", "id").c_str(), g_config.GetValue("Trade", "code").c_str()); g_trade.InitParams(g_config.GetValue("Trade", "ip").c_str(), g_config.GetValue("Trade", "broker").c_str(), g_config.GetValue("Trade", "user").c_str(), g_config.GetValue("Trade", "password").c_str()); int tick = GetTickCount(); while (1) { if (g_bTradeLoginIn) break; this_thread::sleep_for(chrono::milliseconds(100)); if (GetTickCount() - tick > 60 * 1000) break; } g_trade.Release(); g_fileMap.SetTradingDay(g_nPreTradingDay, g_nTradingDay); if (g_bTradeLoginIn) break; retry_time--; OnLog("交易 登錄超時[1分鍾],重新登錄\n"); if (retry_time == 0) { OnLog("交易 登錄次數達到3次,不再嘗試重新登錄!\n"); break; } } } ////////////////////////////////////////////////////////////////////////// #pragma endregion trade module ///market call back int g_nMarketLoginIn = 0; void OnMarketComplete() { g_nMarketLoginIn++; } void OnMarketRspMessage(const char* from, const char* msg, future_msg_type msg_type) { OnLog(future_helper::format_string("%s%s:%s\n", msg_type == future_msg_type::CTP_SUCCESS ? "" : "*", from, msg).c_str()); } void OnDepthMarketData(CThostFtdcDepthMarketDataField* p) { if (!p) return; g_buffer.push(*p); } void MarketLogin() { int retry_time = 3; std::vector<CThostFtdcInstrumentField> vecTradingCode; g_trade.GetTradingCode(vecTradingCode, THOST_FTDC_PC_Futures); g_trade.GetTradingCode(vecTradingCode, THOST_FTDC_PC_Options); printf("訂閱合約:%d\n", vecTradingCode.size()); g_fileMap.SetTradingCode(vecTradingCode); while (1) { g_nMarketLoginIn = 0; for (int i = 1; i <= 3; i++) { string this_ip = g_config.GetValue("Market", future_helper::format_string("ip%d", i).c_str()); if (this_ip == "") break; auto pMdSpi = make_shared<CThostFtdcMdSpiImpl>(); g_vecQuote.push_back(pMdSpi); pMdSpi->SetTradingCode(vecTradingCode); pMdSpi->SetCallBackFunc(OnDepthMarketData, OnMarketRspMessage, nullptr, OnMarketComplete); pMdSpi->InitParams(this_ip.c_str(), "8888", "88888888", "88888888"); } int tick = GetTickCount(); while (1) { if (g_nMarketLoginIn == g_vecQuote.size()) break; this_thread::sleep_for(chrono::milliseconds(100)); if (GetTickCount() - tick > 60 * 1000) break; } if (g_nMarketLoginIn > 0 && g_nMarketLoginIn != g_vecQuote.size()) { int nUnconnectIndex = 1; OnLog("***********************\n"); OnLog("*有未能正常登錄的行情賬號:\n"); for (auto iter = g_vecQuote.begin(); iter != g_vecQuote.end(); iter++) { if (!(*iter)->IsConnected()) { OnLog(future_helper::format_string("%d、%s\n", nUnconnectIndex, (*iter)->GetFrontIP()).c_str()); nUnconnectIndex++; } } OnLog("***********************\n"); OnLog("已有行情連接成功,未連接的不嘗試重連\n"); } if (g_nMarketLoginIn > 0) break; OnLog("行情 登錄超時[1分鍾],重新登錄\n"); retry_time--; if (retry_time == 0) { OnLog("行情 登錄次數達到3次,不再嘗試重新登錄!\n"); break; } } } void ThreadConsumeTick() { OnLog("處理線程已啟動\n"); map<string, CThostFtdcDepthMarketDataField> m_mapShot; while (g_bThread) { if (g_buffer.empty()) { this_thread::sleep_for(chrono::milliseconds(5)); continue; } CThostFtdcDepthMarketDataField data; g_buffer.pop(data); if (data.UpdateTime[2] == ':') { data.UpdateTime[2] = data.UpdateTime[3]; data.UpdateTime[3] = data.UpdateTime[4]; data.UpdateTime[4] = data.UpdateTime[6]; data.UpdateTime[5] = data.UpdateTime[7]; data.UpdateTime[6] = 0; } if (atoi(data.UpdateTime) > 180000) { _snprintf_s(data.TradingDay, 9, "%d", g_nPreTradingDay); } else { _snprintf_s(data.TradingDay, 9, "%d", g_nTradingDay); } bool bNewTick = false; auto find_shot = m_mapShot.find(data.InstrumentID); if (find_shot == m_mapShot.end()) { bNewTick = true; m_mapShot[data.InstrumentID] = data; } else { long long llThis = future_helper::to_longlong(atoi(data.TradingDay), atoi(data.UpdateTime)) * 1000 + data.UpdateMillisec; long long llLast = future_helper::to_longlong(atoi(find_shot->second.TradingDay), atoi(find_shot->second.UpdateTime)) * 1000 + find_shot->second.UpdateMillisec; if (llThis > llLast || find_shot->second.Volume < data.Volume) {//鄭商所沒有毫秒 bNewTick = true; find_shot->second = data; } } if (bNewTick && g_fileMap.IsOK()) { g_fileMap.AddDepthData(&data); } else if (bNewTick) { g_vecBuffer.push_back(data); } } OnLog("處理線程已退出\n"); } //bReset == true:清空共享內存中的tick數據 void OutPutTickData(bool bReset) { unsigned int* exist_size; CThostFtdcDepthMarketDataField* p = g_fileMap.GetDepthData(&exist_size); if (!p || *exist_size <= 0) { OnLog("共享內存中無任何tick數據\n從內存中導出\n"); if (g_vecBuffer.size() == 0) { OnLog("內存中無任何tick數據\n"); return; } future_helper::safe_create_floder((g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000)).c_str()); ofstream ofs_tick(g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000) + "\\data_" + future_helper::to_string(g_nTradingDay) + ".txt", ios_base::out); for (auto& field : g_vecBuffer) { char szBuf[1024]; _snprintf_s(szBuf, 1024, //交易日,最后修改時間,最后修改毫秒,合約代碼, //最新價,上次結算價,昨收盤,昨持倉量, //今開盤,最高價,最低價,數量,成交金額,持倉量, //漲停板價,跌停板價, //申買價一,申買量一,申賣價一,申賣量一, //當日均價 "%s,%s,%d,%s,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%d,%.4f\n", field.TradingDay, field.UpdateTime, field.UpdateMillisec, field.InstrumentID, field.LastPrice, field.PreSettlementPrice, field.PreClosePrice, field.PreOpenInterest, field.OpenPrice, field.HighestPrice, field.LowestPrice, field.Volume, field.Turnover, field.OpenInterest, field.UpperLimitPrice, field.LowerLimitPrice, field.BidPrice1, field.BidVolume1, field.AskPrice1, field.AskVolume1, field.AveragePrice); ofs_tick << szBuf; } return; } OnLog(future_helper::format_string("輸出共享內存中的tick數據 %d 個\n", *exist_size).c_str()); future_helper::safe_create_floder((g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000)).c_str()); ofstream ofs_tick(g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000) + "\\data_" + future_helper::to_string(g_nTradingDay) + ".txt", ios_base::out); for (int i = 0; i < *exist_size; i++) { auto& field = *(CThostFtdcDepthMarketDataField*)(p + i); char szBuf[1024]; _snprintf_s(szBuf, 1024, //交易日,最后修改時間,最后修改毫秒,合約代碼, //最新價,上次結算價,昨收盤,昨持倉量, //今開盤,最高價,最低價,數量,成交金額,持倉量, //漲停板價,跌停板價, //申買價一,申買量一,申賣價一,申賣量一, //當日均價 "%s,%s,%d,%s,%.3f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%d,%.4f", field.TradingDay, field.UpdateTime, field.UpdateMillisec, field.InstrumentID, future_helper::check_double(field.LastPrice), future_helper::check_double(field.PreSettlementPrice), future_helper::check_double(field.PreClosePrice), future_helper::check_double(field.PreOpenInterest), future_helper::check_double(field.OpenPrice), future_helper::check_double(field.HighestPrice), future_helper::check_double(field.LowestPrice), field.Volume, future_helper::check_double(field.Turnover), future_helper::check_double(field.OpenInterest), future_helper::check_double(field.UpperLimitPrice), future_helper::check_double(field.LowerLimitPrice), future_helper::check_double(field.BidPrice1), field.BidVolume1, future_helper::check_double(field.AskPrice1), field.AskVolume1, future_helper::check_double(field.AveragePrice)); ofs_tick << szBuf << endl; } ofs_tick.flush(); ofs_tick.close(); if (bReset) *exist_size = 0; } void Open() { printf("%s\n", future_helper::get_local_datetime().c_str()); g_nMarketLoginIn = 0; OnLog("區間交易開始!\n"); if (!g_thdDealData) { g_bThread = true; g_thdDealData = make_shared<thread>(ThreadConsumeTick); } if (!g_fileMap.CreateFileMap(FILE_MAP_KEY)) { OnLog("共享內存創建失敗!直接存文件...\n"); } else { g_fileMap.InitDefaultRange(); } TradeLogin(); unsigned int* exist_count; auto pTick = g_fileMap.GetDepthData(&exist_count); int local_time = atoi(future_helper::get_local_time(false).c_str()); if ((!pTick || *exist_count == 0) && (local_time < 151500 || local_time > 180000)) { ifstream ifs_tick(g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000) + "\\data_" + future_helper::to_string(g_nTradingDay) + ".txt", ios_base::in); if (ifs_tick.is_open()) { OnLog("本地加載tick到共享內存:"); int tick_count = 0; CThostFtdcDepthMarketDataField data; char szLine[1024]; while (ifs_tick.getline(szLine, 1024)) { future_helper::LineToStruct(szLine, data); g_fileMap.AddDepthData(&data); tick_count++; } OnLog(future_helper::format_string("%d條\n", tick_count).c_str()); } } MarketLogin(); } void Close() { printf("%s\n", future_helper::get_local_datetime().c_str()); int close_count = 0; for (auto& md : g_vecQuote) { if (md->IsConnected()) { close_count++; md->ReleaseAPI(); } } g_vecQuote.clear(); if (close_count > 0) { OnLog(future_helper::format_string("區間交易結束!關閉行情接收:%d\n", close_count).c_str()); if (g_thdDealData && g_thdDealData->joinable()) { g_bThread = false; g_thdDealData->join(); g_thdDealData = nullptr; } } g_nMarketLoginIn = 0; OnLog(future_helper::format_string("共享內存使用%d/%d(mb)=%.2f%%%%\n", g_fileMap.GetTotalUsedSpace() / 1024 / 1024, MAX_PAGE_SIZE / 1024 / 1024, (double)g_fileMap.GetTotalUsedSpace() * 100 / MAX_PAGE_SIZE).c_str()); } int main() { if (!g_config.Open((future_helper::GetWorkingDir() + "\\system.ini").c_str())) { OnLog("system.ini打開失敗\n"); system("pause"); } OnLog("執行開啟共享內存測試..."); if (!g_fileMap.CreateFileMap(FILE_MAP_KEY)) { OnLog("失敗!"); system("pause"); } else { OnLog("成功!\n"); g_fileMap.Release(); } OnLog("==========start==========\n"); printf("初次啟動需要輸入前交易日,用來更新夜盤的日期..."); scanf_s("%d", &g_nPreTradingDay); int night_tick_count = -1; int last_local_time = atoi(future_helper::get_local_time(false).c_str()); while (1) { if (_kbhit() != 0) { printf("**********不要長時間阻塞此處,這樣將導致軟件無法正常工作!***********\n"); printf("**********記得輸入完之后按回車鍵哦! ***********\n"); string str; std::getline(std::cin, str); if (str == "quit") { Close(); break; } else if (str == "help") { printf("quit:退出\n"); printf("help:幫助\n"); printf("open:手動開啟接收\n"); printf("close:手動關閉接收\n"); printf("tradingday:交易日\n"); printf("tradingcode:可交易合約\n"); printf("tickdata:輸出接收到的tick數據\n"); printf("sharememory:共享內存使用率\n"); } else if (str == "open") { Open(); } else if (str == "close") { Close(); } else if (str == "tradingday") { printf("交易日:%d-%d\n", g_nPreTradingDay, g_nTradingDay); } else if (str == "tradingcode") { printf("期貨可交易合約:\n"); map<string, vector<CThostFtdcInstrumentField>> mapClasses; vector<CThostFtdcInstrumentField> vec; g_trade.GetTradingCode(vec, THOST_FTDC_PC_Futures); for (auto& item : vec) { mapClasses[item.ProductID].push_back(item); } for (auto iter = mapClasses.begin(); iter != mapClasses.end(); iter++) { printf("%s:", iter->first.c_str()); for (auto& item : iter->second) { printf("%s ", item.InstrumentID); } printf("\n"); } } else if (str == "tickdata") { OutPutTickData(false); } else if (str == "sharememory") { unsigned int* exist_size; CThostFtdcDepthMarketDataField* p = g_fileMap.GetDepthData(&exist_size); OnLog(future_helper::format_string("tick數據個數:%d\n共享內存使用%d/%d(mb)=%.2f%%%%\n", p ? *exist_size : 0, g_fileMap.GetTotalUsedSpace() / 1024 / 1024, MAX_PAGE_SIZE / 1024 / 1024, (double)g_fileMap.GetTotalUsedSpace() * 100 / MAX_PAGE_SIZE).c_str()); } } int local_time = atoi(future_helper::get_local_time(false).c_str()); if ((local_time >= 23500 && last_local_time < 23500) || (local_time >= 155000 && last_local_time < 155000)) { unsigned int* exist_count; auto pTick = g_fileMap.GetDepthData(&exist_count); Close(); if ((local_time >= 155000 && last_local_time < 155000)){ OnLog("交易日結束了?..."); if (night_tick_count == -1 || *exist_count > night_tick_count) { OnLog("是的!\n"); OutPutTickData(true); if (night_tick_count != -1) { OnLog("打開TickToKline.exe\n"); ::ShellExecute(NULL, "open", (future_helper::GetWorkingDir() + "\\TickToKline.exe").c_str(), "1", NULL, SW_SHOW); } night_tick_count = -1; } else { OnLog(future_helper::format_string("沒有!(共享內存的數據沒有比夜盤多%d-%d)!\n", night_tick_count, *exist_count).c_str()); } } else { OnLog("夜盤結束了?..."); if (night_tick_count == -1 && *exist_count > 0) { OnLog("是的!輸出一次夜盤數據\n"); night_tick_count = *exist_count; OutPutTickData(false); } else { OnLog("現在不是夜盤結束時間!(沒有夜盤數據 或者 已經輸出過一次夜盤數據了)\n"); } } } if ((local_time >= 82000 && last_local_time < 82000) || (local_time >= 202000 && last_local_time < 202000)) { Close(); Open(); } last_local_time = local_time; this_thread::sleep_for(chrono::seconds(1)); } return 0; }
歡迎討論交流,qq896250188