|
05、实现训练流程的执行代码与效果展示_笔记
- # -*- coding: utf-8 -*-
- __author__ = 'dongfangyao'
- __date__ = '2019/3/13 上午10:33'
- __product__ = 'PyCharm'
- __filename__ = '2_image_style_conver'
- import tensorflow as tf
- from tensorflow import logging
- import os
- from tensorflow import gfile
- from PIL import Image
- import time
- import numpy as np
- logging.set_verbosity(logging.INFO)
- # logging.info('dfy_88888')
- # vgg net 中写死的 归一化的数据预处理
- VGG_MEAN = [103.939, 116.779, 123.68]
- class VGGNet:
- """
- 构建VGG16的网络结构 并从预训练好的模型提取参数 加载
- """
- def __init__(self, data_dict):
- self.data_dict = data_dict
- def get_conv_kernel(self, name):
- # 卷积核的参数:w 0 b 1
- return tf.constant(self.data_dict[name][0], name='conv')
- def get_fc_weight(self, name):
- return tf.constant(self.data_dict[name][0], name='fc')
- def get_bias(self, name):
- return tf.constant(self.data_dict[name][1], name='bias')
- def conv_layer(self, inputs, name):
- """
- 构建一个卷积计算层
- :param inputs: 输入的feature_map
- :param name: 卷积层的名字 也是获得参数的key 不能出错
- :return:
- """
- with tf.name_scope(name):
- """
- 多使用name_scope的好处:1、防止参数命名冲突 2、tensorboard可视化时很规整
- 如果scope里面有变量需要训练时则用tf.variable_scope
- """
- conv_w = self.get_conv_kernel(name)
- conv_b = self.get_bias(name)
- # tf.layers.conv2d() 这是一个封装更高级的api
- # 里面并没有提供接口来输入卷积核参数 这里不能用 平时训练cnn网络时非常好用
- result = tf.nn.conv2d(input=inputs, filter=conv_w, strides=[1, 1, 1, 1], padding='SAME', name=name)
- result = tf.nn.bias_add(result, conv_b)
- result = tf.nn.relu(result)
- return result
- def pooling_layer(self, inputs, name):
- # tf.layers.max_pooling2d()
- # tf.nn.max_pool 这里的池化层没有参数 两套api都可以用
- return tf.nn.max_pool(inputs, [1, 2, 2, 1], [1, 2, 2, 1], padding='SAME', name=name)
- def fc_layer(self, inputs, name, activation=tf.nn.relu):
- """
- 构建全连接层
- :param inputs: 输入
- :param name:
- :param activation: 是否有激活函数的封装
- :return:
- """
- with tf.name_scope(name):
- fc_w = self.get_fc_weight(name)
- fc_b = self.get_bias(name)
- # fc: wx+b 线性变换
- result = tf.nn.bias_add(tf.matmul(inputs, fc_w), fc_b)
- if activation is None:
- # vgg16的最后是不需relu激活的
- return result
- else:
- return activation(result)
- def flatten_op(self, inputs, name):
- # 展平操作 为了后续的fc层必须将维度展平
- with tf.name_scope(name):
- # [NHWC]---> [N, H*W*C]
- x_shape = inputs.get_shape().as_list()
- dim = 1
- for d in x_shape[1:]:
- dim *= d
- inputs = tf.reshape(inputs, shape=[-1, dim])
- # 直接用现成api也是可以的
- # return tf.layers.flatten(inputs)
- return inputs
- def build(self, input_rgb):
- """
- 构建vgg16网络结构 抽取特征 FP过程
- :param input_rgb: [1, 224, 224, 3] [NHWC]
- :return:
- """
- start_time = time.time()
- logging.info('building start...')
- # 在通道维度上分离 深度可分离卷积中也需要用到这个api
- r, g, b = tf.split(input_rgb, num_or_size_splits=3, axis=3)
- # 在通道维度上拼接
- # 输入vgg网络的图像是bgr的(与OpenCV一样 倒序的)而不是rgb
- x_bgr = tf.concat(values=[
- b - VGG_MEAN[0],
- g - VGG_MEAN[1],
- r - VGG_MEAN[2],
- ], axis=3)
- assert x_bgr.get_shape().as_list()[1:] == [224, 224, 3]
- # 构建网络
- # stage 1
- self.conv1_1 = self.conv_layer(x_bgr, 'conv1_1')
- self.conv1_2 = self.conv_layer(self.conv1_1, 'conv1_2')
- self.pool1 = self.pooling_layer(self.conv1_2, 'pool1')
- # stage 2
- self.conv2_1 = self.conv_layer(self.pool1, 'conv2_1')
- self.conv2_2 = self.conv_layer(self.conv2_1, 'conv2_2')
- self.pool2 = self.pooling_layer(self.conv2_2, 'pool2')
- # stage 3
- self.conv3_1 = self.conv_layer(self.pool2, 'conv3_1')
- self.conv3_2 = self.conv_layer(self.conv3_1, 'conv3_2')
- self.conv3_3 = self.conv_layer(self.conv3_2, 'conv3_3')
- self.pool3 = self.pooling_layer(self.conv3_3, 'pool3')
- # stage 4
- self.conv4_1 = self.conv_layer(self.pool3, 'conv4_1')
- self.conv4_2 = self.conv_layer(self.conv4_1, 'conv4_2')
- self.conv4_3 = self.conv_layer(self.conv4_2, 'conv4_3')
- self.pool4 = self.pooling_layer(self.conv4_3, 'pool4')
- # stage 5
- self.conv5_1 = self.conv_layer(self.pool4, 'conv5_1')
- self.conv5_2 = self.conv_layer(self.conv5_1, 'conv5_2')
- self.conv5_3 = self.conv_layer(self.conv5_2, 'conv5_3')
- self.pool5 = self.pooling_layer(self.conv5_3, 'pool5')
- # flatten_op
- # self.flatten = self.flatten_op(self.pool5, 'flatten_op')
- #
- # # fc
- # self.fc6 = self.fc_layer(self.flatten, 'fc6')
- # self.fc7 = self.fc_layer(self.fc6, 'fc7')
- # self.fc8 = self.fc_layer(self.fc7, 'fc8', activation=None)
- # self.logits = tf.nn.softmax(self.fc8, name='logits')
- logging.info('building end... 耗时%3d秒' % (time.time() - start_time))
- #
- # vgg16_for_result = VGGNet(data_dict)
- # image_rgb = tf.placeholder(dtype=tf.float32, shape=[1, 224, 224, 3], name='image_rgb')
- # vgg16_for_result.build(image_rgb)
- # print(vgg16_for_result.conv1_1)
- # print(vgg16_for_result.flatten)
- # print(vgg16_for_result.fc6)
- """
- 模块(一个一个实现来写代码):
- 1、定义输入文件与输出目录
- 2、管理模型的超参
- 3、数据的提供(内容图像 风格图像 随机初始化的图像)
- 4、构建计算图(数据流图、定义loss、train_op)
- 5、训练执行过程(会话中执行 设备:cpu或gpu或tpu)
- """
- vgg16_npy_path = './vgg16.npy'
- content_img_path = './dfy_88888.png'
- style_img_path = './style.png'
- output_dir = './output_imgs'
- if not gfile.Exists(output_dir):
- gfile.MakeDirs(output_dir)
- def get_default_params():
- return tf.contrib.training.HParams(
- learning_rate=10,
- lambda_content_loss=0.1,
- lambda_style_loss=3000,
- )
- hps = get_default_params()
- print(hps.learning_rate)
- print(hps.lambda_content_loss)
- def read_img(image_name):
- img = Image.open(image_name)
- np_img = np.array(img)
- # [224, 224, 4]
- print('np_img shape:', np_img.shape, image_name)
- # RGBA--->RGB [224, 224, 3]
- np_img = np_img[:, :, 0:3]
- np_img = np.asarray([np_img], dtype=np.float32)
- print('np_img shape: ', np_img.shape)
- # (1, 224, 224, 3)
- return np_img
- # read_img(content_img_path)
- content_img_arr_val = read_img(content_img_path)
- style_img_arr_val = read_img(style_img_path)
- def initial_image(shape, mean, stddev):
- # 截断的随机的正态分布 数据产生
- initial_img = tf.truncated_normal(shape=shape, mean=mean, stddev=stddev, dtype=tf.float32)
- return tf.Variable(initial_value=initial_img, trainable=True)
- result_img_val = initial_image([1, 224, 224, 3], mean=255//2, stddev=20)
- # 用占位符 具体的值在sess中通过feed_dict喂养 后面执行阶段会有
- content_img = tf.placeholder(dtype=tf.float32, shape=[1, 224, 224, 3], name='content_img')
- style_img = tf.placeholder(dtype=tf.float32, shape=[1, 224, 224, 3], name='style_img')
- # 提取图像的卷积层的特征
- vgg16_data = np.load(vgg16_npy_path, encoding='latin1')
- data_dict = vgg16_data.item()
- vgg16_for_result_img = VGGNet(data_dict)
- vgg16_for_content_img = VGGNet(data_dict)
- vgg16_for_style_img = VGGNet(data_dict)
- # 结果图像vgg16的构建
- vgg16_for_result_img.build(result_img_val)
- # 内容图像vgg16的构建
- vgg16_for_content_img.build(content_img)
- # 风格图像vgg16的构建
- vgg16_for_style_img.build(style_img)
- # 定义需要提取哪些层的特征了 cnn
- # 内容图像的内容特征抽取 越低层效果越好
- # shape: [NWHC] 卷积层经过激励之后的输出 feature_map
- content_features = [
- # vgg16_for_content_img.conv1_1,
- vgg16_for_content_img.conv2_1,
- # vgg16_for_content_img.conv3_1,
- # vgg16_for_content_img.conv3_2,
- # vgg16_for_content_img.conv5_1,
- # vgg16_for_content_img.conv5_3,
- ]
- # 结果图像的内容特征抽取 必须一致
- result_content_features = [
- # vgg16_for_result_img.conv1_1,
- vgg16_for_result_img.conv2_1,
- # vgg16_for_result_img.conv3_1,
- # vgg16_for_result_img.conv3_2,
- # vgg16_for_result_img.conv5_1,
- # vgg16_for_result_img.conv5_3,
- ]
- # 风格图像的风格特征抽取 越高层越好
- style_features = [
- # vgg16_for_style_img.conv1_1,
- # vgg16_for_style_img.conv2_1,
- # vgg16_for_style_img.conv3_1,
- # vgg16_for_style_img.conv4_2,
- vgg16_for_style_img.conv4_3,
- vgg16_for_style_img.conv5_3,
- ]
- # 结果图像的风格特征抽取 必须一致
- result_style_features = [
- # vgg16_for_result_img.conv1_1,
- # vgg16_for_result_img.conv2_1,
- # vgg16_for_result_img.conv3_1,
- # vgg16_for_result_img.conv4_2,
- vgg16_for_result_img.conv4_3,
- vgg16_for_result_img.conv5_3,
- ]
- # loss = loss_content + loss_style
- content_loss = tf.zeros(shape=1, dtype=tf.float32)
- # shape [2, 2] ---> [[0, 0],[0, 0]]
- # shape 2 ---> [0, 0]
- # shape 1 ----> [0 ]
- # shape 0 ----> 0 标量
- # mse 平方差损失函数
- # zip([1, 2], [3, 4]) ---> [(1, 3), (2, 4)]
- for c, c_result in zip(content_features, result_content_features):
- # c c_result [NHWC]
- content_loss += tf.reduce_mean(tf.square(c - c_result), axis=[1, 2, 3])
- # Gram矩阵 得到关联性的度量
- def gram_matrix(x):
- """
- Gram矩阵的计算 k个feature_map 两两之间的关联性 相似度的计算 k*k的矩阵
- k=channels
- :param x: shape [NHWC] [1, height, width, channels]
- :return:
- """
- batch_size, h, w, c = x.get_shape().as_list()
- # features shape [b, h*w, c]
- features = tf.reshape(x, shape=[batch_size, h*w, c])
- # [c, c] = [c, h*w] 矩阵乘法 [h*w, c]
- # features[0] shape : [h*w, c]
- gram = tf.matmul(tf.matrix_transpose(features[0]), features[0]) / tf.constant(h*w*c, tf.float32)
- # gram shape : [c, c] [k, k]
- return gram
- # 列表生成式
- style_gram_matrix = [gram_matrix(feature) for feature in style_features]
- result_style_gram_matrix = [gram_matrix(feature) for feature in result_style_features]
- style_loss = tf.zeros(shape=1, dtype=tf.float32)
- for s, s_result in zip(style_gram_matrix, result_style_gram_matrix):
- style_loss += tf.reduce_mean(tf.square(s - s_result), axis=[0, 1])
- # 最终的损失函数loss由两部分组成:内容损失与风格损失的加权和
- loss = hps.lambda_content_loss * content_loss + hps.lambda_style_loss * style_loss
- with tf.name_scope('train_op'):
- train_op = tf.train.AdamOptimizer(hps.learning_rate).minimize(loss)
- # 只要有变量就必须执行 全局变量初始化
- init_op = tf.global_variables_initializer()
- num_steps = 100
- with tf.Session() as sess:
- sess.run(init_op)
- for step in range(num_steps):
- fetches = [loss, content_loss, style_loss, train_op]
- loss_value, content_loss_value, style_loss_value, _ = sess.run(fetches=fetches, feed_dict={
- content_img: content_img_arr_val,
- style_img: style_img_arr_val
- })
- logging.info('Step:%d, loss_value: %8.4f, content_loss_value: %8.4f, style_loss_value: %8.4f' %
- ((step+1), loss_value, content_loss_value, style_loss_value))
- # 把每一步的生成的结果图片保存到目录
- # result_img_val shape : [1, 224, 224, 3]
- # result_image = sess.run(result_img_val)[0]
- result_image = result_img_val.eval(sess)[0]
- # result_image shape : [224, 224, 3]
- # 图像的像素值必须是在0-255之间 值裁剪
- result_image = np.clip(result_image, 0, 255)
- # TypeError: data type not understood
- # result_image = np.asarray(result_image, dtype=tf.uint8)
- result_image = np.asarray(result_image, dtype=np.uint8)
- # result_image = np.asarray([result_image], dtype=np.uint8)
- img = Image.fromarray(result_image)
- result_image_path = os.path.join(output_dir, 'result-%05d.png' % (step+1))
- img.save(result_image_path)
- pass
复制代码
|
|