使用TensorFlow进行图片分类实例


本文使用TensorFlow的基本语法(不使用高级库),搭建神经网络,来完成图片分类的功能。

实现流程是:

主要分为:制作数据集、搭建CNN网络进行训练,使用测试集验证、对一张实际应用的图片进行预测。

发环境:ancanada + python3.5 +tensorflow1.3

文件结构如图所示:data里面存放的是图片,net里用来存放训练的模型,TFrecorder是制作数据集图片预处理,resnet是搭建的resnet18层的网络结构,之后的几个文件见名知义。

 

1、制作数据集

制作数据集的图片来源,可以使用Python从百度上爬去,将图片下载到文件夹中,每一个类别分为一个文件夹,我们使用TensorFlow中的tfrecord,将图片划为同一格式,并且为每一类图片添加上标签,最后生成tfrecord文件,并将其作为神经网络的输入。

如图所示,bottle、paper里面存放的是瓶子和纸盒两类图片,record用来存放tfrecord文件,testdata是最后用来预测的图片

 

这样,制作数据集的代码可以分为,生成tfrecord文件、读取tfrecord文件、读取的tfrecord文件的图片分为一个batch,这三个函数进行编写,数据集里分为训练集和测试集,我是将每一类的图片的前70%存储为训练集的tfrecord文件,剩下的30%存储为测试集的tfrecord文件。下面是我的实现代码:

  1 import  tensorflow as tf
  2 from PIL import Image 
  3 import os
  4 import matplotlib.pyplot as plt 
  5 import numpy as np
  6 
  7 path = "D:\code\resnet\data"
  8 train_record_path = "data/record/train.tfrecords"
  9 test_record_path = "data/record/test.tfrecords"
 10 classes={'bottle','paper'} #人为 设定 2 类
 11 
 12 def _byteslist(value):
 13     return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
 14  
 15 def _int64list(value):
 16     return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
 17 def create_train_record():
 18     writer = tf.python_io.TFRecordWriter(train_record_path)
 19     NUM = 1
 20     for index, name in enumerate(classes):
 21         class_path = path + "/" + name + '/'
 22         l = int(len(os.listdir(class_path))*0.7)
 23         print("create tf "+str(index))
 24         for image_name in os.listdir(class_path)[:l]:
 25             image_path = class_path + image_name
 26             img = Image.open(image_path)
 27             img = img.resize((224, 224))
 28             img_raw = img.tobytes()
 29             example = tf.train.Example(
 30                     features=tf.train.Features(feature={
 31                             'label': _int64list(index),
 32                             'img_raw': _byteslist(img_raw)}))
 33             writer.write(example.SerializeToString())
 34             print('creat train record in ', NUM)
 35             NUM += 1
 36     writer.close()
 37     print('creat_train_record success !')
 38     
 39 def create_test_record():
 40     writer = tf.python_io.TFRecordWriter(test_record_path)
 41     NUM = 1
 42     for index,name in enumerate(classes):
 43         class_path = path + "/"+name+"/"
 44         l = int(len(os.listdir(class_path))*0.7)
 45         for image_name in os.listdir(class_path)[l:]:
 46             image_path = class_path + image_name
 47             img = Image.open(image_path)
 48             img = img.resize((224,224))
 49             img_raw = img.tobytes()
 50             example = tf.train.Example(
 51                     features = tf.train.Features(feature={
 52                         'label':_int64list(index),
 53                         'img_raw':_byteslist(img_raw)}))
 54             writer.write(example.SerializeToString())
 55             print('creat test record in',NUM)
 56             NUM+=1
 57     writer.close()
 58     print('creat_test_record success !')
 59     
 60 def read_record(filename, img_w, img_h):
 61     filename_queue = tf.train.string_input_producer([filename])
 62     reader = tf.TFRecordReader()
 63     _, serialize_example = reader.read(filename_queue)
 64     feature = tf.parse_single_example(
 65             serialize_example,
 66             features={
 67                     'label': tf.FixedLenFeature([], tf.int64),
 68                     'img_raw': tf.FixedLenFeature([], tf.string)})
 69     label = feature['label']
 70     img = feature['img_raw']
 71     img = tf.decode_raw(img, tf.uint8)
 72     img = tf.reshape(img, (224, 224, 3))
 73     img = tf.image.resize_image_with_crop_or_pad(img, img_w, img_h) 
 74     img = tf.cast(img, tf.float32)/255
 75     label = tf.cast(label, tf.int32)
 76     return img, label
 77 
 78 def get_batch_record(filename, batch_size, img_W, img_H):
 79     image, label = read_record(filename, img_W, img_H)
 80     image_batch, label_batch= tf.train.shuffle_batch([image, label],
 81                                                      batch_size=batch_size,
 82                                                      
 83                                                      capacity=30,
 84                                                      min_after_dequeue=10)
 85     label_batch = tf.one_hot(label_batch,depth=2)
 86     return image_batch, label_batch
 87 
 88 
 89 
 90 # if __name__ == '__main__':
 91 #     img, label = get_batch_record(test_record_path,1,224,224)
 92 #     print(img)
 93 
 94 # img, label = get_batch_record(test_record_path,2, 224, 224)
 95 
 96 # with tf.Session() as sess:
 97 #
 98 #     sess.run(tf.global_variables_initializer())
 99 #     sess.run(tf.local_variables_initializer())
100 #     coord = tf.train.Coordinator()
101 #     threads = tf.train.start_queue_runners(sess, coord)
102 #     for i in range(200):
103 #         image, l =sess.run([img, label])
104 #         print(image[0].shape)
105         
106 #         #print(image[1].shape)
107 #         print(l[0])
108         
109 #         plt.imshow(image[0])
111 #         plt.show()
112 #     coord.request_stop()

上述代码的说明:生成tfrecord文件时每一类图片的label值是定义数组class的索引号,class数组是{bottle,paper},那么bottle文件夹中图片的标签值(label)是0,图片预处理基本上就两步,第一裁剪放缩到(224,224,3),第二步将图片的像素值归一化为0到1之间,标签值在形成一个batch时候,又转换为onehot格式。

2、搭建网络结构,训练模型

我采用的是resnet网络架构中的18层结构,根据resnet论文中的结构进行堆叠,为了方便我自己的理解,没有调用高级库。搭建的时候主要的难点残差结构的实现,当正常卷积通道数与捷径的通道数相等时,两个可直接相加,通道数不等时,对捷径做一次1*1*通道数的卷积,之后再相加。如图所示:

 

前向传播搭建resnet18网络结构代码如下:

import tensorflow as tf
def weight_variable(shape):
    initial = tf.truncated_normal(shape,stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1,shape = shape)
    return tf.Variable(initial)

def conv2d(input,filter,strides,padding="SAME"):
    return tf.nn.conv2d(input,filter,strides,padding="SAME")

def resnet18(input):
    kernel_1 = weight_variable([7,7,3,64])
    bias_1 = weight_variable([64])
    layer_1 = tf.nn.relu(conv2d(input,kernel_1,strides=[1,2,2,1]) + bias_1)
    Maxpool_1 = tf.nn.max_pool(layer_1,ksize=[1,3,3,1],strides=[1,2,2,1],padding="SAME")
    
    kernel_2 = weight_variable([3,3,64,64])
    layer_2 = tf.nn.relu(conv2d(Maxpool_1,kernel_2,strides=[1,1,1,1]))
    
    kernel_3 = weight_variable([3,3,64,64])
    layer_3 = conv2d(layer_2,kernel_3,strides=[1,1,1,1]) 
    res1 = tf.nn.relu(Maxpool_1+layer_3)
    
    kernel_4 = weight_variable([3,3,64,64])
    layer_4 = tf.nn.relu(conv2d(res1,kernel_4,strides=[1,1,1,1]))
    
    kernel_5 = weight_variable([3,3,64,64])
    layer_5 = conv2d(layer_4,kernel_5,strides=[1,1,1,1])
    res2 = tf.nn.relu(res1+layer_5)
    
    kernel_6 = weight_variable([3,3,64,128])
    layer_6 = tf.nn.relu(conv2d(res2,kernel_6,strides=[1,2,2,1]))
    
    kernel_7 = weight_variable([3,3,128,128])
    layer_7 = conv2d(layer_6,kernel_7,strides=[1,1,1,1])
    kernel_line_1 = weight_variable([1,1,64,128])
    res3 = tf.nn.relu(conv2d(res2,kernel_line_1,strides=[1,2,2,1]) + layer_7)
    
    kernel_8 = weight_variable([3,3,128,128])
    layer_8 = tf.nn.relu(conv2d(res3,kernel_8,strides=[1,1,1,1]))
    
    kernel_9 = weight_variable([3,3,128,128])
    layer_9 = conv2d(layer_8,kernel_9,strides=[1,1,1,1])
    res4 = tf.nn.relu(res3+layer_9)
    
    kernel_10 = weight_variable([3,3,128,256])
    layer_10 = tf.nn.relu(conv2d(res4,kernel_10,strides=[1,2,2,1]))
    
    kernel_11 = weight_variable([3,3,256,256])
    layer_11 = conv2d(layer_10,kernel_11,strides=[1,1,1,1])
    kernel_line_2 = weight_variable([1,1,128,256]) 
    res5 = tf.nn.relu(conv2d(res4,kernel_line_2,strides=[1,2,2,1])+layer_11)
    
    kernel_12 = weight_variable([3,3,256,256])
    layer_12 = tf.nn.relu(conv2d(res5,kernel_12,strides=[1,1,1,1]))
    
    kernel_13 = weight_variable([3,3,256,256])
    layer_13 = conv2d(layer_12,kernel_13,strides=[1,1,1,1])
    res6 = tf.nn.relu(res5+layer_13)
    
    kernel_14 = weight_variable([3,3,256,512])
    layer_14 = tf.nn.relu(conv2d(res6,kernel_14,strides=[1,2,2,1]))
    
    kernel_15 = weight_variable([3,3,512,512])
    layer_15 = conv2d(layer_14,kernel_15,strides=[1,1,1,1])
    kernel_line_3 = weight_variable([1,1,256,512])
    res7 = tf.nn.relu(conv2d(res6,kernel_line_3,strides=[1,2,2,1])+ layer_15)
    
    kernel_16 = weight_variable([3,3,512,512])
    layer_16 = tf.nn.relu(conv2d(res7,kernel_16,strides=[1,1,1,1]))
    
    kernel_17 = weight_variable([3,3,512,512])
    layer_17 = conv2d(layer_16,kernel_17,strides=[1,1,1,1])
    res8 = tf.nn.relu(layer_17+res7)
    
    avgpool = tf.nn.avg_pool(res8,ksize=[1,7,7,1],strides=[1,1,1,1],padding="VALID")
    
    line = tf.reshape(avgpool,[-1,512])
    
    fc_18 = weight_variable([512,2])
    bias_18 = bias_variable([2])
    layer_18 = tf.matmul(line,fc_18)+bias_18
    
    
    return layer_18

反向传播,输入图片和标签,使得loss最小化的代码如下:

 1 import tensorflow as tf
 2 import Ipynb_importer
 3 import matplotlib.pyplot as plt
 4 from TFrecorder import get_batch_record
 5 import numpy as np
 6 from resnet import resnet18
 7 batch_size=20
 8 filename = "data/record/train.tfrecords"
 9 filename_test = "data/record/test.tfrecords"
10 num_classes = 2
11 img_w = 224
12 img_h = 224
13 
14 x = tf.placeholder(tf.float32, [None, img_w, img_h, 3])
15 y = tf.placeholder(tf.float32, [None, num_classes])
16 prediction=resnet18(x)
17 loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y,logits=prediction))
18 train_step=tf.train.AdamOptimizer(0.001).minimize(loss)
19 
20 
21 correct_prediction=tf.equal(tf.argmax(y,1),tf.argmax(prediction,1))#argmax求最大的概率
22 accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))
23 
24 image_batch, label_batch = get_batch_record(filename, batch_size, img_w, img_h)
25 image_batch_test, label_batch_test = get_batch_record(filename_test, batch_size, img_w, img_h)
26 init = tf.global_variables_initializer()
27 
28 saver = tf.train.Saver()
29 
30 with tf.Session() as sess:
31     sess.run(init)
32     saver.restore(sess, "net/my_resnet18.ckpt")
33     coord = tf.train.Coordinator()
34     threads = tf.train.start_queue_runners(sess, coord)
35     #image, label = sess.run([image_batch, label_batch])
36     for i in range(302):
37         image, label = sess.run([image_batch, label_batch])
38         #image_test, label_test = sess.run([image_batch_test, label_batch_test])
39         sess.run(train_step,feed_dict={x:image,y:label})
40         l = sess.run(loss,feed_dict={x:image,y:label})
41         #acc = sess.run(accuracy,feed_dict={x:image_test,y:label_test})
42         if i%20==0:
43             print("iter: "+str(i)+" loss "+str(l))
44         saver.save(sess,"net/my_resnet18.ckpt")
45     coord.request_stop()  # 7
46     coord.join(threads) 

代码的说明:里面使用了多线程读取,最后要关闭线程。还有里面加了断点续训,在开始训练之前会先载入模型参数,第一次训练如果报错,将32行注释就可以了。

3、测试集验证

这个里面就没有什么难点了,直接放代码:

 1 import tensorflow as tf
 2 import Ipynb_importer
 3 import matplotlib.pyplot as plt
 4 from TFrecorder import get_batch_record
 5 import numpy as np
 6 from resnet import resnet18
 7 batch_size=20
 8 filename = "data/record/test.tfrecords"
 9 num_classes = 2
10 img_w = 224
11 img_h = 224
12 
13 
14 
15 
16 x = tf.placeholder(tf.float32, [None, img_w, img_h, 3])
17 y = tf.placeholder(tf.float32, [None, num_classes])
18 prediction=resnet18(x)
19 
20 21 #train_step=tf.train.AdamOptimizer(0.001).minimize(loss)
22 
23 correct_prediction=tf.equal(tf.argmax(y,1),tf.argmax(prediction,1))#argmax求最大的概率
24 accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))
25 
26 image_batch, label_batch = get_batch_record(filename, batch_size, img_w, img_h)
27 
28 init = tf.global_variables_initializer()
29 saver = tf.train.Saver()
30 
31 with tf.Session() as sess:
32     
33     sess.run(init)
34     module_file =  tf.train.latest_checkpoint('/my_resnet18.ckpt')
35     saver.restore(sess, "net/my_resnet18.ckpt")
36     coord = tf.train.Coordinator()
37     threads = tf.train.start_queue_runners(sess, coord)
38     
39     for i in range(21):
40         image, label = sess.run([image_batch, label_batch])
41         acc = sess.run(accuracy,feed_dict={x:image,y:label})
42         43         if i%2==0:
44             print(" acc: "+str(acc))
45            
46     coord.request_stop()  # 7
47     coord.join(threads) 

4、单张图片预测

从文件夹中读取一张图片之后进行预测。

代码如下:

 

import tensorflow as tf
import Ipynb_importer
from PIL import Image 
import matplotlib.pyplot as plt
import numpy as np
from resnet import resnet18
from skimage import io, transform
import numpy as np
from pylab import mpl


image_dir = "data/testdata/4.jpg"
MODEL_SAVE_PATH = "net/my_resnet18.ckpt"
def load_image(path):
    #传入读入图片的参数路径
    img = io.imread(path) 
    #将像素归一化处理到[0,1]
    img = img / 255.0 
    re_img = transform.resize(img, (224, 224)) 
    plt.imshow(re_img)
    plt.show()
    img_ready = re_img.reshape((1, 224, 224, 3))
    return img_ready

def prediction_result():
    x = tf.placeholder(tf.float32, [None, 224, 224, 3])
    image = load_image(image_dir)
    y=resnet18(x)
    # 利用softmax来获取概率
    #数组的0号代表瓶子,1号位置是纸盒
    probabilities = tf.nn.softmax(y)
    # 获取最大概率的标签位置
    correct_prediction = tf.argmax(y, 1)
    init = tf.global_variables_initializer()
    saver = tf.train.Saver()
    
    with tf.Session() as sess:
        sess.run(init)
        saver.restore(sess, MODEL_SAVE_PATH)
        #image = sess.run(image)
        probabilities, label = sess.run([probabilities, correct_prediction], feed_dict={x:image})
        print(probabilities)
        if label == 0:
            print("this is a bottle")
            
        
        
    
def main():
    #print('数组的0号代表瓶子,1号位置是纸盒')
    prediction_result()
   

if __name__ == '__main__':
    main() 

 

 

最后添加的是一个应用场景的功能,可以换应用场景,相应的就要更换数据集的图片,例如可以使用嵌入式硬件作为终端拍摄图片上传到服务器进行识别,我毕业设计的应用场景就是使用树莓派控制摄像头对物体进行拍照,上传到服务端,返回识别结果,语音和显示屏输出,来教幼儿园的小孩,识字和认识物体。这个场景的实现以后有时间会更新。写这篇博文的目的,一是为了总结学的图片分类的知识,二是希望给想学习图片分类应用做入门的参考。

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM