在使用队列前,我们了解队列的特点是先进先出
1 . 新建一个线程操作类Process.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace 安全队列
{
public class Process
{
/// <summary>
/// 大数据的处理,我们必然会用到多线程和队列来处理数据,已提交高效率和资源利用率, 线程安全队列,
/// 使用场景主要是多线操作同一个队列,
/// 多个推送数据入队,同时多个线程消费队列,有可能出现排队抢资源的情况,数据脏读,线程安全队列就排上用场了;
/// </summary>
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
/// <summary>
/// 生产
/// </summary>
public void Push()
{
int i = 0;
while (true)
{
try
{
lock (this.queue)
{
//由于本机电脑资源有限,所以队列入队数据做了现在,如果不做限制,程序会出异常,服务器资源不够
//通常情况下,大数据的处理,正式服务器资源是要好得多的,所以程序的设计是要考虑服务器资源和性能的;
//有多少资源干多少活;
// Thread.Sleep(2 * 1000);
queue.Enqueue(i);
//原子级递增
Interlocked.Increment(ref i);
//i++;
Thread.Sleep(1);
}
}
catch (Exception)
{
}
}
}
/// <summary>
/// 消费
/// </summary>
public void Dequeues()
{
while (true)
{
try
{
int data;
//该方法为线程安全方法
if (queue.TryDequeue(out data))
{
Console.WriteLine(data);
}
}
catch (Exception)
{
}
}
}
/// <summary>
/// 启动线程
/// </summary>
public void Do()
{
//生产线程
Task.Factory.StartNew(Push);
//消费线程
for (int i = 0; i < 2; i++)
{
Task.Factory.StartNew(Dequeues);
}
}
}
}
2 . 演示
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace 安全队列
{
class Program
{
static ConcurrentDictionary<string, string> dic = new System.Collections.Concurrent.ConcurrentDictionary<string, string>();
static Dictionary<int, int> dic1 = new System.Collections.Generic.Dictionary<int, int>();
static BlockingCollection<int> blok = new System.Collections.Concurrent.BlockingCollection<int>(10);
static ConcurrentQueue<string> conQueue = new System.Collections.Concurrent.ConcurrentQueue<string>();
static int totalCount=0;
/// <summary>
/// 并发字典添加
/// </summary>
static void Push()
{
for (int i = 0; i < 10; i++)
{
dic.TryAdd(i.ToString(), i.ToString());
}
}
/// <summary>
/// 并发字典取出数据
/// </summary>
static void Pop()
{
var lenght = dic.Count;
if (dic.Count > 0)
{
for (int i = 0; i < lenght; i++)
{
var straa1 = string.Empty;
//aaa.TryGetValue(i.ToString(), out straa);
dic.TryRemove(i.ToString(), out straa1);
Console.WriteLine(straa1);
}
}
}
/// <summary>
/// 普通字典填充
/// </summary>
static void Push1()
{
lock (dic1)
{
dic1.Add(0, 0);
for (int i = 0; i < 30; i++)
{
if (!dic1.Keys.Contains(i))
dic1.Add(i, i);
}
}
}
/// <summary>
/// 普通字典取值
/// </summary>
static void Pop1()
{
lock (dic1)
{
var length = dic1.Count;
if (dic1.Count > 0)
{
var val = 0;
for (int i = 0; i < length; i++)
{
var isOrNo = dic1.TryGetValue(i, out val);
if (isOrNo)
{
dic1.Remove(i);
}
Console.WriteLine(val + "/" + dic1.Count);
}
}
}
}
/// <summary>
/// 安全线程阻塞队列填充
/// </summary>
static void Push2()
{
for (int i = 0; i < 15; i++)
{
blok.Add(i);
}
}
/// <summary>
/// 安全线程取值
/// </summary>
static void Pop2()
{
//foreach (var item in blok)
//{
// Console.WriteLine(item);
//}
while (true)
{
if (blok.Count > 0)
{
Console.WriteLine(blok.Take() + "/" + blok.Count);
}
}
}
/// <summary>
/// 线程安全队列
/// </summary>
static void Push3()
{
for (int i = 0; i < 17; i++)
{
conQueue.Enqueue(i.ToString());
System.Threading.Interlocked.Increment(ref totalCount);
}
}
/// <summary>
/// 从线程安全队列中取出
/// </summary>
static void Pop3()
{
string result = string.Empty;
while (true)
{
if (!conQueue.IsEmpty)
{
if (conQueue.TryDequeue(out result))
{
}
Console.WriteLine(result);
}
Thread.Sleep(200);
}
}
static void Peek3()
{
//Thread.Sleep(3000);
bool isruning = true;
while (isruning)
{
//string result = string.Empty;
//conQueue.TryPeek(out result);
//Console.WriteLine(result + "/" + conQueue.Count.ToString());
Console.WriteLine(totalCount);
if (totalCount == conQueue.Count)
{
isruning = false;
}
}
}
/// <summary>
/// 程序主入口
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
// Task.Factory.StartNew(Push3);
//Task.Factory.StartNew(Pop3);
//Task.Factory.StartNew(Peek3);
Process pr = new Process();
pr.Do();
Console.Read();
}
}
}