在使用隊列前,我們了解隊列的特點是先進先出
1 . 新建一個線程操作類Process.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace 安全隊列
{
public class Process
{
/// <summary>
/// 大數據的處理,我們必然會用到多線程和隊列來處理數據,已提交高效率和資源利用率, 線程安全隊列,
/// 使用場景主要是多線操作同一個隊列,
/// 多個推送數據入隊,同時多個線程消費隊列,有可能出現排隊搶資源的情況,數據臟讀,線程安全隊列就排上用場了;
/// </summary>
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
/// <summary>
/// 生產
/// </summary>
public void Push()
{
int i = 0;
while (true)
{
try
{
lock (this.queue)
{
//由於本機電腦資源有限,所以隊列入隊數據做了現在,如果不做限制,程序會出異常,服務器資源不夠
//通常情況下,大數據的處理,正式服務器資源是要好得多的,所以程序的設計是要考慮服務器資源和性能的;
//有多少資源干多少活;
// Thread.Sleep(2 * 1000);
queue.Enqueue(i);
//原子級遞增
Interlocked.Increment(ref i);
//i++;
Thread.Sleep(1);
}
}
catch (Exception)
{
}
}
}
/// <summary>
/// 消費
/// </summary>
public void Dequeues()
{
while (true)
{
try
{
int data;
//該方法為線程安全方法
if (queue.TryDequeue(out data))
{
Console.WriteLine(data);
}
}
catch (Exception)
{
}
}
}
/// <summary>
/// 啟動線程
/// </summary>
public void Do()
{
//生產線程
Task.Factory.StartNew(Push);
//消費線程
for (int i = 0; i < 2; i++)
{
Task.Factory.StartNew(Dequeues);
}
}
}
}
2 . 演示
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace 安全隊列
{
class Program
{
static ConcurrentDictionary<string, string> dic = new System.Collections.Concurrent.ConcurrentDictionary<string, string>();
static Dictionary<int, int> dic1 = new System.Collections.Generic.Dictionary<int, int>();
static BlockingCollection<int> blok = new System.Collections.Concurrent.BlockingCollection<int>(10);
static ConcurrentQueue<string> conQueue = new System.Collections.Concurrent.ConcurrentQueue<string>();
static int totalCount=0;
/// <summary>
/// 並發字典添加
/// </summary>
static void Push()
{
for (int i = 0; i < 10; i++)
{
dic.TryAdd(i.ToString(), i.ToString());
}
}
/// <summary>
/// 並發字典取出數據
/// </summary>
static void Pop()
{
var lenght = dic.Count;
if (dic.Count > 0)
{
for (int i = 0; i < lenght; i++)
{
var straa1 = string.Empty;
//aaa.TryGetValue(i.ToString(), out straa);
dic.TryRemove(i.ToString(), out straa1);
Console.WriteLine(straa1);
}
}
}
/// <summary>
/// 普通字典填充
/// </summary>
static void Push1()
{
lock (dic1)
{
dic1.Add(0, 0);
for (int i = 0; i < 30; i++)
{
if (!dic1.Keys.Contains(i))
dic1.Add(i, i);
}
}
}
/// <summary>
/// 普通字典取值
/// </summary>
static void Pop1()
{
lock (dic1)
{
var length = dic1.Count;
if (dic1.Count > 0)
{
var val = 0;
for (int i = 0; i < length; i++)
{
var isOrNo = dic1.TryGetValue(i, out val);
if (isOrNo)
{
dic1.Remove(i);
}
Console.WriteLine(val + "/" + dic1.Count);
}
}
}
}
/// <summary>
/// 安全線程阻塞隊列填充
/// </summary>
static void Push2()
{
for (int i = 0; i < 15; i++)
{
blok.Add(i);
}
}
/// <summary>
/// 安全線程取值
/// </summary>
static void Pop2()
{
//foreach (var item in blok)
//{
// Console.WriteLine(item);
//}
while (true)
{
if (blok.Count > 0)
{
Console.WriteLine(blok.Take() + "/" + blok.Count);
}
}
}
/// <summary>
/// 線程安全隊列
/// </summary>
static void Push3()
{
for (int i = 0; i < 17; i++)
{
conQueue.Enqueue(i.ToString());
System.Threading.Interlocked.Increment(ref totalCount);
}
}
/// <summary>
/// 從線程安全隊列中取出
/// </summary>
static void Pop3()
{
string result = string.Empty;
while (true)
{
if (!conQueue.IsEmpty)
{
if (conQueue.TryDequeue(out result))
{
}
Console.WriteLine(result);
}
Thread.Sleep(200);
}
}
static void Peek3()
{
//Thread.Sleep(3000);
bool isruning = true;
while (isruning)
{
//string result = string.Empty;
//conQueue.TryPeek(out result);
//Console.WriteLine(result + "/" + conQueue.Count.ToString());
Console.WriteLine(totalCount);
if (totalCount == conQueue.Count)
{
isruning = false;
}
}
}
/// <summary>
/// 程序主入口
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
// Task.Factory.StartNew(Push3);
//Task.Factory.StartNew(Pop3);
//Task.Factory.StartNew(Peek3);
Process pr = new Process();
pr.Do();
Console.Read();
}
}
}