转载需注明出处:http://blog.csdn.net/minimicall,http://cloudtrade.top/
Lean引擎的模块划分非常的规范。其中DataFeed是数据槽,就是供应数据的模块。
1. IDataFeed 接口
模块的接口为:
namespace QuantConnect.Lean.Engine.DataFeeds { /// <summary> /// Datafeed interface for creating custom datafeed sources. /// 数据供应的借口 /// </summary> public interface IDataFeed { /******************************************************** * INTERFACE PROPERTIES *********************************************************/ /// <summary> /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data. /// 订阅列表 /// </summary> List<SubscriptionDataConfig> Subscriptions { get; } /// <summary> /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime). /// 实时价格 /// </summary> /// <remarks>Indexed in order of the subscriptions</remarks> List<decimal> RealtimePrices { get; } /// <summary> /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out. /// 跨线程使用的队列,datafeed线程放入数据,算法主线程读出数据 /// </summary> ConcurrentQueue<List<BaseData>>[] Bridge { get; set; } /// <summary> /// Boolean flag indicating there is no more data in any of our subscriptions. /// </summary> bool EndOfBridges { get; } /// <summary> /// Array of boolean flags indicating the data status for each queue/subscription we're tracking. /// </summary> bool[] EndOfBridge { get; } /// <summary> /// Set the source of the data we're requesting for the type-readers to know where to get data from. /// </summary> /// <remarks>Live or Backtesting Datafeed</remarks> DataFeedEndpoint DataFeed { get; set; } /// <summary> /// Public flag indicator that the thread is still busy. /// 设置该线程是否活跃 /// </summary> bool IsActive { get; } /// <summary> /// The most advanced moment in time for which the data feed has completed loading data /// </summary> DateTime LoadedDataFrontier { get; } /// <summary> /// Data has completely loaded and we don't expect any more. /// </summary> bool LoadingComplete { get; } /******************************************************** * INTERFACE METHODS *********************************************************/ /// <summary> /// Primary entry point. /// </summary> void Run(); /// <summary> /// External controller calls to signal a terminate of the thread. /// </summary> void Exit(); /// <summary> /// Purge all remaining data in the thread. /// </summary> void PurgeData(); } }IDataFeed是数据槽接口,是其他实现类必须实现的。
2. BaseDataFeed 数据槽基类
它实现IDataFeed,并且是其他派生类的一个基类。
namespace QuantConnect.Lean.Engine.DataFeeds { /// <summary> /// Common components of a data feed allowing the extender to implement only the parts which matter. /// 数据槽的基类,允许派生类定制部分 /// </summary> public abstract class BaseDataFeed : IDataFeed { /******************************************************** * CLASS VARIABLES *********************************************************/ private IAlgorithm _algorithm; private BacktestNodePacket _job; private bool _endOfStreams = false; private int _subscriptions = 0; private int _bridgeMax = 500000; private bool _exitTriggered = false; private DateTime[] _frontierTime; /******************************************************** * CLASS PROPERTIES *********************************************************/ /// <summary> /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data. /// 订阅列表信息 /// </summary> public List<SubscriptionDataConfig> Subscriptions { get; private set; } /// <summary> /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime). /// 实时价格 /// </summary> /// <remarks>Indexed in order of the subscriptions</remarks> public List<decimal> RealtimePrices { get; private set; } /// <summary> /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out. /// 桥 /// </summary> public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; } /// <summary> /// Stream created from the configuration settings. /// 配置产生的流 /// </summary> public SubscriptionDataReader[] SubscriptionReaderManagers { get; set; } /// <summary> /// Set the source of the data we're requesting for the type-readers to know where to get data from. /// </summary> /// <remarks>Live or Backtesting Datafeed</remarks> public DataFeedEndpoint DataFeed { get; set; } /// <summary> /// Flag indicating the hander thread is completely finished and ready to dispose. /// </summary> public bool IsActive { get; private set; } /// <summary> /// Flag indicating the file system has loaded all files. /// </summary> public bool LoadingComplete { get; private set; } /// <summary> /// Furthest point in time that the data has loaded into the bridges. /// </summary> public DateTime LoadedDataFrontier { get; private set; } /// <summary> /// Signifying no more data across all bridges /// </summary> public bool EndOfBridges { get { for (var i = 0; i < Bridge.Length; i++) { if (Bridge[i].Count != 0 || EndOfBridge[i] != true) { return false; } } return true; } } /// <summary> /// End of Stream for Each Bridge: /// </summary> public bool[] EndOfBridge { get; set; } /******************************************************** * CLASS CONSTRUCTOR *********************************************************/ /// <summary> /// Create an instance of the base datafeed. /// </summary> public BaseDataFeed(IAlgorithm algorithm, BacktestNodePacket job) { //Save the data subscriptions Subscriptions = algorithm.SubscriptionManager.Subscriptions;//是一个链表,每个节点代表了对一种证券资产数据的订阅 _subscriptions = Subscriptions.Count;//订阅了证券数目 //Public Properties: DataFeed = DataFeedEndpoint.FileSystem;//默认赋予从文件系统读取 IsActive = true;//线程是否活跃 Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions];//桥是一个链表的链表 EndOfBridge = new bool[_subscriptions]; SubscriptionReaderManagers = new SubscriptionDataReader[_subscriptions];//初始化读者列表 RealtimePrices = new List<decimal>(_subscriptions);//初始化实时价格数据列表 _frontierTime = new DateTime[_subscriptions]; //Class Privates: _job = job;//相关任务 _algorithm = algorithm;//相关算法 _endOfStreams = false; _bridgeMax = _bridgeMax / _subscriptions; //Initialize arrays: for (var i = 0; i < _subscriptions; i++) { _frontierTime[i] = job.PeriodStart; EndOfBridge[i] = false; Bridge[i] = new ConcurrentQueue<List<BaseData>>();//分配每个订阅桥节点的数据链表 //为每个订阅分配读者 SubscriptionReaderManagers[i] = new SubscriptionDataReader(Subscriptions[i], algorithm.Securities[Subscriptions[i].Symbol], DataFeedEndpoint.Database, job.PeriodStart, job.PeriodFinish); } } /// <summary> /// Launch the primary data thread. /// 读数据的线程主函数 /// </summary> public virtual void Run() { while (!_exitTriggered && IsActive && !EndOfBridges) { for (var i = 0; i < Subscriptions.Count; i++) { //With each subscription; fetch the next increment of data from the queues: //为每一个订阅,读取下一个数据 var subscription = Subscriptions[i];//第i个证券订阅 if (Bridge[i].Count < 10000 && !EndOfBridge[i])//确定该证券读取的数据个数没有超出界限 { var data = GetData(subscription);//读取数据的函数,返回数据 //Comment out for live databases, where we should continue asking even if no data. if (data.Count == 0)//如果这个订阅没有数据,那么这个订阅就读取结束,跳到下一个订阅读取 { EndOfBridge[i] = true;//本订阅读取结束 continue; } ////Insert data into bridge, each list is time-grouped. Assume all different time-groups. foreach (var obj in data) { Bridge[i].Enqueue(new List<BaseData>() { obj }); } ////Record the furthest moment in time. _frontierTime[i] = data.Max(bar => bar.Time); } } //Set the most backward moment in time we've loaded LoadedDataFrontier = _frontierTime.Min(); } IsActive = false; } /// <summary> /// Get the next set of data for this subscription /// 获取该订阅的下一集合数据 /// </summary> /// <param name="subscription"></param> /// <returns></returns> public abstract List<BaseData> GetData(SubscriptionDataConfig subscription); /// <summary> /// Send an exit signal to the thread. /// </summary> public virtual void Exit() { _exitTriggered = true; PurgeData(); } /// <summary> /// Loop over all the queues and clear them to fast-quit this thread and return to main. /// </summary> public virtual void PurgeData() { foreach (var t in Bridge) { t.Clear(); } } } }3 FileSystemDataFeed文件系统数据槽
namespace QuantConnect.Lean.Engine.DataFeeds { /******************************************************** * CLASS DEFINITIONS *********************************************************/ /// <summary> /// Historical datafeed stream reader for processing files on a local disk. /// 从本地磁盘加载历史数据 /// </summary> /// <remarks>Filesystem datafeeds are incredibly fast</remarks> public class FileSystemDataFeed : IDataFeed { /******************************************************** * CLASS VARIABLES *********************************************************/ // Set types in public area to speed up: private IAlgorithm _algorithm; private BacktestNodePacket _job; private bool _endOfStreams = false; private int _subscriptions = 0; private int _bridgeMax = 500000; private bool _exitTriggered = false; /******************************************************** * CLASS PROPERTIES *********************************************************/ /// <summary> /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data. /// </summary> public List<SubscriptionDataConfig> Subscriptions { get; private set; } /// <summary> /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime). /// </summary> /// <remarks>Indexed in order of the subscriptions</remarks> public List<decimal> RealtimePrices { get; private set; } /// <summary> /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out. /// </summary> public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; } /// <summary> /// Set the source of the data we're requesting for the type-readers to know where to get data from. /// </summary> /// <remarks>Live or Backtesting Datafeed</remarks> public DataFeedEndpoint DataFeed { get; set; } /// <summary> /// Flag indicating the hander thread is completely finished and ready to dispose. /// </summary> public bool IsActive { get; private set; } /// <summary> /// Flag indicating the file system has loaded all files. /// </summary> public bool LoadingComplete { get; private set; } /// <summary> /// Furthest point in time that the data has loaded into the bridges. /// </summary> public DateTime LoadedDataFrontier { get; private set; } /// <summary> /// Stream created from the configuration settings. /// </summary> private SubscriptionDataReader[] SubscriptionReaders { get; set; } /// <summary> /// Signifying no more data across all bridges /// </summary> public bool EndOfBridges { get { for (var i = 0; i < Bridge.Length; i++) { if (Bridge[i].Count != 0 || EndOfBridge[i] != true || _endOfStreams != true) { return false; } } return true; } } /// <summary> /// End of Stream for Each Bridge: /// </summary> public bool[] EndOfBridge { get; set; } /// <summary> /// Frontiers for each fill forward high water mark /// </summary> public DateTime[] FillForwardFrontiers; /******************************************************** * CLASS CONSTRUCTOR *********************************************************/ /// <summary> /// Create a new backtesting data feed. /// </summary> /// <param name="algorithm">Instance of the algorithm</param> /// <param name="job">Algorithm work task</param> public FileSystemDataFeed(IAlgorithm algorithm, BacktestNodePacket job) { Console.WriteLine("FileSystemDataFeed,algorithm:" + algorithm + ",job: " + job); Subscriptions = algorithm.SubscriptionManager.Subscriptions; Console.WriteLine("Subscriptions.count:" + Subscriptions.Count); _subscriptions = Subscriptions.Count; //Public Properties: DataFeed = DataFeedEndpoint.FileSystem; IsActive = true; Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions]; EndOfBridge = new bool[_subscriptions]; SubscriptionReaders = new SubscriptionDataReader[_subscriptions]; FillForwardFrontiers = new DateTime[_subscriptions]; RealtimePrices = new List<decimal>(_subscriptions); //Class Privates: _job = job; _algorithm = algorithm; _endOfStreams = false; _bridgeMax = _bridgeMax / _subscriptions; //Set the bridge maximum count: for (var i = 0; i < _subscriptions; i++) { //Create a new instance in the dictionary: Bridge[i] = new ConcurrentQueue<List<BaseData>>(); EndOfBridge[i] = false; SubscriptionReaders[i] = new SubscriptionDataReader(Subscriptions[i], _algorithm.Securities[Subscriptions[i].Symbol], DataFeed, _job.PeriodStart, _job.PeriodFinish); FillForwardFrontiers[i] = new DateTime(); } } /******************************************************** * CLASS METHODS *********************************************************/ /// <summary> /// Main routine for datafeed analysis. /// </summary> /// <remarks>This is a hot-thread and should be kept extremely lean. Modify with caution.</remarks> public void Run() { Log.Trace("debug FileSystemDataFeed.run()"); Console.WriteLine("FileSystemDataFeed.run()"); //Calculate the increment based on the subscriptions: var tradeBarIncrements = CalculateIncrement(includeTick: false); var increment = CalculateIncrement(includeTick: true); //Loop over each date in the job foreach (var date in Time.EachTradeableDay(_algorithm.Securities, _job.PeriodStart, _job.PeriodFinish)) { Log.Trace("in trading date:"+date+",PeriodStart:"+_job.PeriodStart+",PeriodFinish:"+_job.PeriodFinish); //Update the source-URL from the BaseData, reset the frontier to today. Update the source URL once per day. // this is really the next frontier in the future var frontier = date.Add(increment); var activeStreams = _subscriptions; Log.Trace("subscription:" + _subscriptions); //Initialize the feeds to this date: for (var i = 0; i < _subscriptions; i++) { //Don't refresh source when we know the market is closed for this security: Log.Trace("i:"+i+"subscription"); var success = SubscriptionReaders[i].RefreshSource(date); //If we know the market is closed for security then can declare bridge closed. if (success) { EndOfBridge[i] = false; } else { ProcessMissingFileFillForward(SubscriptionReaders[i], i, tradeBarIncrements, date); EndOfBridge[i] = true; } } //Pause the DataFeed var bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax); var bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0); var active = GetActiveStreams(); //Pause here while bridges are full, but allow missing files to pass while (bridgeFullCount > 0 && ((_subscriptions - active) == bridgeZeroCount) && !_exitTriggered) { bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax); bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0); Thread.Sleep(5); } // for each smallest resolution var datePlusOneDay = date.Date.AddDays(1); while ((frontier.Date == date.Date || frontier.Date == datePlusOneDay) && !_exitTriggered) { var cache = new List<BaseData>[_subscriptions]; //Reset Loop: long earlyBirdTicks = 0; //Go over all the subscriptions, one by one add a minute of data to the bridge. //对所订阅的证券进行一个个的加载,加载到数据桥中 for (var i = 0; i < _subscriptions; i++) { //Get the reader manager:获得第i个证券的读者 var manager = SubscriptionReaders[i]; //End of the manager stream set flag to end bridge: also if the EOB flag set, from the refresh source method above if (manager.EndOfStream || EndOfBridge[i]) { EndOfBridge[i] = true; activeStreams = GetActiveStreams(); if (activeStreams == 0) { frontier = frontier.Date + TimeSpan.FromDays(1); } continue; } //Initialize data store: cache[i] = new List<BaseData>(2); //Add the last iteration to the new list: only if it falls into this time category //下面这个代码很关键,它把当前读到的数据条放到该证券对应的链表里面 var cacheAtIndex = cache[i]; while (manager.Current.EndTime < frontier) { Log.Trace("Current:symbol:" + manager.Current.Symbol + ",price" + manager.Current.Price); cacheAtIndex.Add(manager.Current);//放Current到该证券对应的链表里面 Log.Trace(string.Format("FileSystemDataFeed,Current: {0}", manager.Current)); if (!manager.MoveNext()) break;//读取下一个数据 } //Save the next earliest time from the bridges: only if we're not filling forward. if (manager.Current != null) { if (earlyBirdTicks == 0 || manager.Current.EndTime.Ticks < earlyBirdTicks) { earlyBirdTicks = manager.Current.EndTime.Ticks; } } } if (activeStreams == 0) { break; } //Add all the lists to the bridge, release the bridge //we push all the data up to this frontier into the bridge at once for (var i = 0; i < _subscriptions; i++) { if (cache[i] != null && cache[i].Count > 0) { FillForwardFrontiers[i] = cache[i][0].EndTime; Bridge[i].Enqueue(cache[i]); } ProcessFillForward(SubscriptionReaders[i], i, tradeBarIncrements); } //This will let consumers know we have loaded data up to this date //So that the data stream doesn't pull off data from the same time period in different events LoadedDataFrontier = frontier; if (earlyBirdTicks > 0 && earlyBirdTicks > frontier.Ticks) { //Jump increment to the nearest second, in the future: Round down, add increment frontier = (new DateTime(earlyBirdTicks)).RoundDown(increment) + increment; } else { //Otherwise step one forward. frontier += increment; } } // End of This Day. if (_exitTriggered) break; } // End of All Days: Log.Trace(DataFeed + ".Run(): Data Feed Completed."); LoadingComplete = true; //Make sure all bridges empty before declaring "end of bridge": while (!EndOfBridges && !_exitTriggered) { for (var i = 0; i < _subscriptions; i++) { //Nothing left in the bridge, mark it as finished if (Bridge[i].Count == 0) { EndOfBridge[i] = true; } } if (GetActiveStreams() == 0) _endOfStreams = true; Thread.Sleep(100); } //Close up all streams: for (var i = 0; i < Subscriptions.Count; i++) { SubscriptionReaders[i].Dispose(); } Log.Trace(DataFeed + ".Run(): Ending Thread... "); IsActive = false; } /// <summary> /// Send an exit signal to the thread. /// 退出该线程 /// </summary> public void Exit() { _exitTriggered = true; PurgeData(); } /// <summary> /// Loop over all the queues and clear them to fast-quit this thread and return to main. /// 清除缓存 /// </summary> public void PurgeData() { foreach (var t in Bridge) { t.Clear(); } } private void ProcessMissingFileFillForward(SubscriptionDataReader manager, int i, TimeSpan increment, DateTime dateToFill) { // we'll copy the current into the next day var subscription = Subscriptions[i]; if (!subscription.FillDataForward || manager.Current == null) return; var start = dateToFill.Date + manager.Exchange.MarketOpen; if (subscription.ExtendedMarketHours) { start = dateToFill.Date + manager.Exchange.ExtendedMarketOpen; } // shift the 'start' time to the end of the bar by adding the increment, this makes 'date' // the end time which also allows the market open functions to behave as expected var current = manager.Current; for (var endTime = start.Add(increment); endTime.Date == dateToFill.Date; endTime = endTime + increment) { if (manager.IsMarketOpen(endTime) || (subscription.ExtendedMarketHours && manager.IsExtendedMarketOpen(endTime))) { EnqueueFillForwardData(i, current, endTime); } else { // stop fill forwarding when we're no longer open break; } } } /// <summary> /// If this is a fillforward subscription, look at the previous time, and current time, and add new /// objects to queue until current time to fill up the gaps. /// </summary> /// <param name="manager">Subscription to process</param> /// <param name="i">Subscription position in the bridge ( which queue are we pushing data to )</param> /// <param name="increment">Timespan increment to jump the fillforward results</param> private void ProcessFillForward(SubscriptionDataReader manager, int i, TimeSpan increment) { // If previous == null cannot fill forward nothing there to move forward (e.g. cases where file not found on first file). if (!Subscriptions[i].FillDataForward || manager.Previous == null || manager.Current == null) return; //Last tradebar and the current one we're about to add to queue: var previous = manager.Previous; var current = manager.Current; // final two points of file that ends at midnight, causes issues in the day rollover/fill forward if (current.EndTime.TimeOfDay.Ticks == 0 && previous.EndTime == current.Time) { return; } //Initialize the frontier: if (FillForwardFrontiers[i].Ticks == 0) FillForwardFrontiers[i] = previous.EndTime; // using the previous to fill forward since 'current' is ahead the frontier var whatToFill = previous; // using current.EndTime as fill until because it's the next piece of data we have for this subscription var fillUntil = current.EndTime; //Data ended before the market closed: premature ending flag - continue filling forward until market close. if (manager.EndOfStream && manager.IsMarketOpen(current.EndTime)) { //Make sure we only fill forward to end of *today* -- don't fill forward tomorrow just because its also open fillUntil = FillForwardFrontiers[i].Date.AddDays(1); // since we ran out of data, use the current as the clone source, it's more recent than previous whatToFill = current; } // loop from our last time (previous.EndTime) to our current.EndTime, filling in all missing day during // request market hours for (var endTime = FillForwardFrontiers[i] + increment; (endTime < fillUntil); endTime = endTime + increment) { if (Subscriptions[i].ExtendedMarketHours) { if (!manager.IsExtendedMarketOpen(endTime.Subtract(increment))) { //If we've asked for extended hours, and the security is no longer inside extended market hours, skip: continue; } } else { // if the market isn't open skip to the current.EndTime and rewind until the market is open // this is the case where the previous value is from yesterday but we're trying to fill forward // the next day, so instead of zooming through 18 hours of off-market hours, skip to our current data // point and rewind the market open. // // E.g, Current.EndTime = 9:40am and Previous.EndTime = 2:00pm, so fill in from 2->4pm // and then skip to 9:40am, reverse to 9:30am and fill from 9:30->9:40 if (!manager.IsMarketOpen(endTime.Subtract(increment)) && Subscriptions[i].Resolution != Resolution.Daily) { // Move fill forward so we don't waste time in this tight loop. endTime = fillUntil; do { endTime = endTime - increment; } // is market open assumes start time of bars, open at 9:30 closed at 4:00 // so decrement our date to use the start time while (manager.IsMarketOpen(endTime.Subtract(increment))); continue; } } // add any overlap condition here if (Subscriptions[i].Resolution == Resolution.Daily) { // handle fill forward on lower resolutions var barStartTime = endTime - increment; if (manager.Exchange.IsOpenDuringBar(barStartTime, endTime, Subscriptions[i].ExtendedMarketHours)) { EnqueueFillForwardData(i, previous, endTime); } // special case catch missing days else if (endTime.TimeOfDay.Ticks == 0 && manager.Exchange.DateIsOpen(endTime.Date.AddDays(-1))) { EnqueueFillForwardData(i, previous, endTime); } continue; } EnqueueFillForwardData(i, whatToFill, endTime); } } private void EnqueueFillForwardData(int i, BaseData previous, DateTime dataEndTime) { var cache = new List<BaseData>(1); var fillforward = previous.Clone(true); fillforward.Time = dataEndTime.Subtract(Subscriptions[i].Increment); fillforward.EndTime = dataEndTime; FillForwardFrontiers[i] = dataEndTime; cache.Add(fillforward); Bridge[i].Enqueue(cache); } /// <summary> /// Get the number of active streams still EndOfBridge array. /// </summary> /// <returns>Count of the number of streams with data</returns> private int GetActiveStreams() { //Get the number of active streams: var activeStreams = (from stream in EndOfBridge where stream == false select stream).Count(); return activeStreams; } /// <summary> /// Calculate the minimum increment to scan for data based on the data requested. /// </summary> /// <param name="includeTick">When true the subscriptions include a tick data source, meaning there is almost no increment.</param> /// <returns>Timespan to jump the data source so it efficiently orders the results</returns> private TimeSpan CalculateIncrement(bool includeTick) { var increment = TimeSpan.FromDays(1); foreach (var config in Subscriptions) { switch (config.Resolution) { //Hourly TradeBars: case Resolution.Hour: if (increment > TimeSpan.FromHours(1)) { increment = TimeSpan.FromHours(1); } break; //Minutely TradeBars: case Resolution.Minute: if (increment > TimeSpan.FromMinutes(1)) { increment = TimeSpan.FromMinutes(1); } break; //Secondly Bars: case Resolution.Second: if (increment > TimeSpan.FromSeconds(1)) { increment = TimeSpan.FromSeconds(1); } break; //Ticks: No increment; just fire each data piece in as they happen. case Resolution.Tick: if (increment > TimeSpan.FromMilliseconds(1) && includeTick) { increment = new TimeSpan(0, 0, 0, 0, 1); } break; } } return increment; } } // End FileSystem Local Feed Class: } // End Namespace
4. BackTestingDataFeed 回归测试数据槽
namespace QuantConnect.Lean.Engine.DataFeeds { /******************************************************** * CLASS DEFINITIONS *********************************************************/ /// <summary> /// Backtesting data feed extends the filesystem data feed with almost no modifications. Later this method can /// be used for implementing alternative sources/generation for backtesting data. /// 回归测试数据槽是文件系统数据槽的派生类 /// </summary> public class BacktestingDataFeed : FileSystemDataFeed { /******************************************************** * CLASS VARIABLES *********************************************************/ /******************************************************** * CLASS PROPERTIES *********************************************************/ /******************************************************** * CLASS CONSTRUCTOR *********************************************************/ /// <summary> /// Pass through the backtesting datafeed to the underlying file system datafeed implementation. /// </summary> /// <param name="algorithm">Algorithm we're operating with</param> /// <param name="job">Algorithm worker job</param> public BacktestingDataFeed(IAlgorithm algorithm, BacktestNodePacket job) : base(algorithm, job) { DataFeed = DataFeedEndpoint.Backtesting; } } // End Backtesting Feed Class: } // End Namespace
此外还有数据库数据槽DataBaseDataFeed和LiveTradingDataFeed实时交易数据槽。在这里就不在说明。