转载需注明出处:http://blog.csdn.net/minimicall,http://cloudtrade.top
我们之前说明了数据读者,数据槽。将数据读取到队列中,在算法主线程中需要使用DataFeed线程的数据。这是一个典型的读者-写着问题。
在主线程中和DataFeed打教导的事DataStream。下面我们看它的代码。说明在注释中说明了。
/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org /licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Collections.Generic; using System.Threading; using System.Diagnostics; using System.Linq; using QuantConnect.Data; using QuantConnect.Lean.Engine.DataFeeds; using QuantConnect.Logging; namespace QuantConnect.Lean.Engine { /******************************************************** * QUANTCONNECT NAMESPACES *********************************************************/ /// <summary> /// Data stream class takes a datafeed hander and converts it into a synchronized enumerable data format for looping /// 数据流类,拥有数据槽句柄,然后将数据转化为同步可枚举的,用作循环。 /// in the primary algorithm thread. /// </summary> public static class DataStream { /******************************************************** * CLASS VARIABLES *********************************************************/ //Count of bridges and subscriptions. //订阅数,就是addSecurity添加的证券数量 private static int _subscriptions; /******************************************************** * CLASS PROPERTIES *********************************************************/ /// <summary> /// The frontier time of the data stream /// /// </summary> public static DateTime AlgorithmTime { get; private set; } /******************************************************** * CLASS METHODS *********************************************************/ /// <summary> /// Process over the datafeed cross thread bridges to generate an enumerable sorted collection of the data, ready for a consumer /// to use and already synchronized in time. /// </summary> /// <param name="feed">DataFeed object</param> /// <param name="frontierOrigin">Starting date for the data feed</param> /// <returns></returns> public static IEnumerable<Dictionary<int, List<BaseData>>> GetData(IDataFeed feed, DateTime frontierOrigin) { //Initialize: _subscriptions = feed.Subscriptions.Count;//订阅的证券数量 AlgorithmTime = frontierOrigin;//算法前置时间 long algorithmTime = AlgorithmTime.Ticks;//获取时间tick var frontier = frontierOrigin; var nextEmitTime = DateTime.MinValue; var periods = feed.Subscriptions.Select(x => x.Resolution.ToTimeSpan()).ToArray(); //Wait for datafeeds to be ready, wait for first data to arrive: while (feed.Bridge.Length != _subscriptions) Thread.Sleep(100); // clear data first when in live mode, start with fresh data if (Engine.LiveMode) { feed.PurgeData(); } //Get all data in queues: return as a sorted dictionary: //获取队列中的所有数据,以排序好的字典返回 while (!feed.EndOfBridges) { //Reset items which are not fill forward: long earlyBirdTicks = 0; var newData = new Dictionary<int, List<BaseData>>(); // spin wait until the feed catches up to our frontier WaitForDataOrEndOfBridges(feed, frontier); for (var i = 0; i < _subscriptions; i++) { //If there's data on the bridge, check to see if it's time to pull it off, if it's in the future // we'll record the time as 'earlyBirdTicks' so we can fast forward the frontier time while (!feed.Bridge[i].IsEmpty)//第i个证券中有数据 { //Look at first item on list, leave it there until time passes this item. //检查链表中第一个条目 List<BaseData> result; if (!feed.Bridge[i].TryPeek(out result)) { // if there's no item skip to the next subscription //如果这个证券没有数据,那么就直接跳到下一个证券 break; } if (result.Count > 0 && result[0].EndTime > frontier) { // we have at least one item, check to see if its in ahead of the frontier, // if so, keep track of how many ticks in the future it is if (earlyBirdTicks == 0 || earlyBirdTicks > result[0].EndTime.Ticks) { earlyBirdTicks = result[0].EndTime.Ticks; } break; } if (result.Count > 0) { // we have at least one item, check to see if its in ahead of the frontier, // if so, keep track of how many ticks in the future it is if (earlyBirdTicks == 0 || earlyBirdTicks > result[0].EndTime.Ticks) { earlyBirdTicks = result[0].EndTime.Ticks; } } //Pull a grouped time list out of the bridge List<BaseData> dataPoints; if (feed.Bridge[i].TryDequeue(out dataPoints)) { // round the time down based on the requested resolution for fill forward data // this is a temporary fix, long term fill forward logic should be moved into this class foreach (var point in dataPoints) { if (algorithmTime < point.EndTime.Ticks) { // set this to most advanced end point in time, pre rounding // min 10:02 10:02:01(FF) 10:02:01.01(FF) // sec 10:02 10:02:01 10:02:01.01(FF) // tic 10:02 10:02:01 10:02:01.01 // the algorithm time should always be the 'frontier' the furthest // time within this data slice algorithmTime = point.EndTime.Ticks; } if (point.IsFillForward) { point.Time = point.Time.RoundDown(periods[i]); } } // add the list to the collection to be yielded List<BaseData> dp; if (!newData.TryGetValue(i, out dp)) { dp = new List<BaseData>(); newData[i] = dp; } dp.AddRange(dataPoints); } else { //Should never fail: Log.Error("DataStream.GetData(): Failed to dequeue bridge item"); } } } if (newData.Count > 0) { AlgorithmTime = new DateTime(algorithmTime); yield return newData; } //Update the frontier and start again. if (earlyBirdTicks > 0) { //Seek forward in time to next data event from stream: there's nothing here for us to do now: why loop over empty seconds frontier = new DateTime(earlyBirdTicks); } else if (feed.EndOfBridges) { // we're out of data or quit break; } //Allow loop pass through emits every second to allow event handling (liquidate/stop/ect...) if (Engine.LiveMode && DateTime.Now > nextEmitTime) { AlgorithmTime = DateTime.Now.RoundDown(periods.Min()); nextEmitTime = DateTime.Now + TimeSpan.FromSeconds(1); yield return new Dictionary<int, List<BaseData>>(); } } Log.Trace("DataStream.GetData(): All Streams Completed."); } /// <summary> /// Waits until the data feed is ready for the data stream to pull data from it. /// 等待数据槽准备好了,可以从里面拉取数据 /// </summary> /// <param name="feed">The IDataFeed instance populating the bridges</param> /// <param name="dataStreamFrontier">The frontier of the data stream</param> private static void WaitForDataOrEndOfBridges(IDataFeed feed, DateTime dataStreamFrontier) { //Make sure all bridges have data to to peek sync properly. var now = Stopwatch.StartNew(); // timeout to prevent infinite looping here -- 50ms for live and 30sec for non-live var loopTimeout = (Engine.LiveMode) ? 50 : 30000; if (Engine.LiveMode) { // give some time to the other threads in live mode Thread.Sleep(1); } //Waiting for data in the bridges: while (!AllBridgesHaveData(feed) && now.ElapsedMilliseconds < loopTimeout) { Thread.Sleep(1); } //we want to verify that our data stream is never ahead of our data feed. //this acts as a virtual lock around the bridge so we can wait for the feed //to be ahead of us // if we're out of data then the feed will never update (it will stay here forever if there's no more data, so use a timeout!!) while (dataStreamFrontier > feed.LoadedDataFrontier && (!feed.EndOfBridges && !feed.LoadingComplete) && now.ElapsedMilliseconds < loopTimeout) { Thread.Sleep(1); } } /// <summary> /// Check if all the bridges have data or are dead before starting the analysis /// /// This determines whether or not the data stream can pull data from the data feed. /// </summary> /// <param name="feed">Feed Interface with concurrent connection between producer and consumer</param> /// <returns>Boolean true more data to download</returns> private static bool AllBridgesHaveData(IDataFeed feed) { //Lock on the bridge to scan if it has data: for (var i = 0; i < _subscriptions; i++) { if (feed.EndOfBridge[i]) continue; if (feed.Bridge[i].IsEmpty) { return false; } } return true; } /// <summary> /// Resets the frontier time to DateTime.MinValue /// </summary> public static void ResetFrontier() { AlgorithmTime = new DateTime(); } } }