Serverless架构下怎么优雅的上传文件?

anycodes刚刚开发经验2830438

前言

在传统开发过程中,我们对文件上传部分相对来说是比较自由:上传什么文件/怎么上传/存储到哪里等问题的决定权往往在我们自己这里,并没有太多的问题。但是在Serverless架构下,我们往往上传文件就没有这么自由了。无论是成本的原因还是某些服务的限制,我们都需要寻求一些比较"优"的解决方案。

Serverless架构与文件上传

由于Serverless架构中的函数计算部分是没有办法做文件持久化的,因为函数执行的容器,用完过后会被回收,所以也就是说你想要存储文件需要借助对象存储等相关的服务。当然将文件上传到对象存储服务的方法有很多,这里只说两种:

  • 函数计算->对象存储

  • 对象存储

一般情况下,我们一个业务如果有文件上传功能,我们通常都会用multipart/form-data,或者将文件进行base64编码之后再上传。但是在Serverless架构下,这种思路要有一些变化。

首先说函数计算->对象存储这种方法,这种方法上传文件,是相对来说比较容易,也是比较常见的,我们将文件直接通过API网关,传送到云函数中,在云函数重做一些处理(例如压缩图像,视频转码,数据入库等),然后再由云函数将结果存储到对象存储中,做文件资源的持久化。这种做法是比较流畅的,也是正常思路。但是理想很丰满,现实很骨感:

  • 首先说直接通过multipart/form-data上传,将文件直接通过网关传给函数的问题,函数计算通过API网关获得到的数据结构,一般来说都是一个JSON格式,或者某些字段干脆就是字符串,所以这样的一个设定,会让函数计算中对二进制的支持非常不友好;所以我们只能通过将文件转换为base64编码之后在进行传输,通过API网关之后,函数部分收到这个数据,再将base64编码的文件解码,然后做一些处理之后,持久化对象存储中。

  • 其次无论是腾讯云的SCF,还是AWS的Lambda,在通过API网关触发函数的时候,都是有一个数据包大小限制的。以腾讯云为例,这个限制是6M。也就是说,你无论发送多大的数据,在API网关到函数计算部分,是有一个数据包的最大限制,如果你上传文件过大,这里就无法进行资源的传输。也就是说,通过上传文件到云函数这个case只能上传6M以下的文件。那么这个6M是一个什么概念呢?上文已经描述过,函数计算对接受二进制文件表现的非常不友好,所推荐将文件base64编码之后传输,编码之后的数据包通常会变大一些,一就是说通过这种方法,我们上传到云函数的数据包可能只有4M左右。那么4M又是一个什么概念呢:


    上图所示,我拿出了我的手机,拍了一张图片,图片大小是6.21M,也就是说如果我想把这张图片上传到SCF来进行一些处理是“不可能”事件,或者说,我就做了一个相册的功能,我就直接把图片上传到函数计算,函数计算再将其存储到对象存储中,这个操作是因为数据包大小而被限制住。

当然,上面这种方法除了对文件大小限制之外,对成本也是有一定影响的,因为API网关相对来说并不是一个对文件进行传输的服务,为什么这么说的,我们就单纯的从流量费用来看对象存储和API网关的区别:

  • API网关的收费:

  • 对象存储的收费:

可以看到单纯根据流量来看API网关的费用就比COS高了很多,其实也可以理解,毕竟API网关更多定位可能是控制流,而真正的数据存储传输这一部分还是对象存储更合适一些。那么我们有没有什么方案可以直接上传文件等资源到对象存储呢?如果我们直接将资源上传到对象存储,这条资源数据又如何入库呢(例如用户上传图片到自己的相册功能,使用传统方法,系统接收到图片,将图片存储,将数据入库,但是如果图片直接上传到对象存储,我们怎么会知道这个图片是那个用户给我们的)?同时,将文件上传到对象存储需要写入权限,那么是将权限开发?还是使用密钥?如果是一个Web服务,这个密钥信息又应该存储在哪里?如何存储?

所以此时此刻,就衍生出了第二种解决方法:

在直传对象存储方法中,客户端发起三个请求,分别是获取临时上传地址、将文件上传到COS、获取处理结果(当然,如果不需要获取处理结果什么的,例如就是用户单纯的上传个文件到自己的账号下,那这种情况就不需要第三次请求了)。

相对于之前的方法,这种方法会稍微复杂一下,但是这种方法对二进制上传、对文件资源的大小以及成本控制都能,都有很好的支持。当然,也并不是说每种场景都只用这个方案,因为不同的方案,在不同的场景中,可能真的有一定的差异,但是不管怎么说,在Serverless架构下,这个方案都是较优的。

针对不同场景的的不同适用方案:

  • 场景1: 用户上传头像功能
    针对这样的场景,其实直接选用方案1,就可以了。因为一般情况下,头像都不是很大的,完全可以在客户端对图像进行一次压缩和裁剪,完成之后,直接带着用户的一些参数,例如用户的token等,上传到函数计算,在函数计算中对图片转存到对象存储以及将图像和用户信息进行关联,并将某些结果返回给客户端。整个流程只需要一个函数,方便快捷。

  • 场景2: 用户上传图片到相册系统中
    针对这样的场景,其实方案2是更好的,因为很多时候上传图片到相册,都是会希望保留原图,而不希望被压缩,那么原图大小很可能超过6M,方案1也并不是十分合理,而且APIGW+函数计算的组合,本身就不是非常适合进行文件的传输等,这个时候优先上传对象存储是比较合理的方案。
    用户可以带着图像要上传的相册以及图片名称,用户的token发起获取临时密钥到函数1中,函数1将用户、相册、图片以及状态(例如待上传、待处理、已处理等)等信息关联,并且存储,然后将临时地址返回给客户端,客户端将图片上传到对象存储中,通过对象存储触发器触发函数2,函数2对图像进行压缩(一般情况下,相册列表都会显示压缩图片,点到相册详情才会有完整的无损图片),并且和之前信息进行关联,修改数据状态。在用户上传图片完成之后,如果有需要,客户端就可以发起第三次请求获取图像存储/处理结果,函数3会查询数据库状态,在某个时间阈值内,如果数据状态是完成,则表示数据已经上传并且完成了部分处理,否则会返回对应的异常信息。

代码实例

接下来分享上面两种方法的实现过程:
函数1,实现第一种方案,文件通过Base64,传递到SCF,由SCF转存到COS:

def uploadToScf(event, context):
    print('event', event)
    print('context', context)
    body = json.loads(event['body'])

    # 可以通过客户端传来的token进行鉴权,只有鉴权通过才可以获得临时上传地址
    # 这一部分可以按需修改,例如用户的token可以在redis获取,可以通过某些加密方法获取等
    # 也可以是传来一个username和一个token,然后去数据库中找这个username对应的token是否
    # 与之匹配等,这样会尽可能的提升安全性
    if "key" not in body or "token" not in body or body['token'] != 'mytoken' or "key" not in body:
        return {"url"None}

    pictureBase64 = body["picture"].split("base64,")[1]
    with open('/tmp/%s' % body['key'], 'wb'as f:
        f.write(base64.b64decode(pictureBase64))
    region = os.environ.get("region")
    secret_id = os.environ.get("TENCENTCLOUD_SECRETID")
    secret_key = os.environ.get("TENCENTCLOUD_SECRETKEY")
    token = os.environ.get("TENCENTCLOUD_SESSIONTOKEN")
    config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token)
    client = CosS3Client(config)
    response = client.upload_file(
        Bucket=os.environ.get("bucket_name"),
        LocalFilePath='/tmp/%s' % body['key'],
        Key=body['key'],
    )
    再说回来,时至今日,主流云厂商都已经开始开发探索Serverless架构,那么各个云厂商的冷启动已经"热"到了什么程度?

由于我是国内的开发者,所以我将测试分为两部分,一部分是国内云厂商(腾讯云、阿里云、华为云),另一部分是国外云厂商(AWS、谷歌)。

由于在实际生产过程中,冷启动的诞生往往是和API网关共同体现,也就是说,实际上让用户感知相对明显,或者比较常见感知到冷启动出现的情况,通常是函数与网关结合,做了一个接口/服务,访问该服务的时候,放大/体现了冷启动。

所以,我的做法很简单和暴力,通过函数与API网关触发器的结合,针对不同厂商来创建一个API服务,通过本地机器来对该服务进行访问,在客户端判断其耗时。当然,这种做法诚然不能精确的表现出冷启动的具体数据,因为网络因素也将会是影响其准确性的一个重要因素,但是至少可以大概的对比出,不同云厂商的冷启动优化情况,以及服务稳定情况。

国内的云厂商,在测试的时候更多使用的是国内区,国外云厂商则是国外区,这样就会出现一个额外的问题,国内外云厂商的数据不具有对比性,毕竟网络因素占了很大的一部分,所以本次对比将会是国内和国内对比,国外和国外对比。

客户端程序:

import time, json
import urllib.request
import matplotlib.pyplot as plt
import numpy
from multiprocessing import Process, Manager


# 测试地址
# qcloud
url = "https://service-px5f98f4-1256773370.gz.apigw.tencentcs.com/release/scf_demo"
# # huaweicloud
# url = "https://2937587fe6ce4e6eb22d521d1d9b811c.apig.cn-east-2.huaweicloudapis.com/demo"
# # aliyun
# url = "https://50155512.cn-shanghai.fc.aliyuncs.com/2016-08-15/proxy/guide-hello_world/mydemo/"
# # aws
# url = "https://di6vbxf2lk.execute-api.us-east-1.amazonaws.com/default/mydemo"
# # GoogleCloud
# url = "https://us-central1-meta-imagery-277209.cloudfunctions.net/mydemo"

# 此时
times = 200

# 串行处理
serialColdStart = []
serialHotStart = []

for i in range(0,times):
    timeStart = time.time()
    responseAttr = urllib.request.urlopen(url)
    endTime = time.time()

    response = json.loads(responseAttr.read().decode("utf-8"))

    if response['isNew']:
        serialColdStart.append(endTime-timeStart)
    else:
        serialHotStart.append(endTime-timeStart)


# 并行处理
def worker(url, return_list):
    timeStart = time.time()
    responseAttr = urllib.request.urlopen(url)
    endTime = time.time()
    return_list.append({
        "duration": endTime-timeStart,
        "response": json.loads(responseAttr.read().decode("utf-8"))
    })

manager = Manager()
return_list = manager.list()
jobs = []
for i in range(times):
    p = Process(target=worker, args=(url ,return_list))
    jobs.append(p)
    p.start()

for proc in jobs:
    proc.join()

parallelColdStart = []
parallelHotStart = []
for eveData in return_list:
    if eveData['response']['isNew']:
        parallelColdStart.append(eveData['duration'])
    else:
        parallelHotStart.append(eveData['duration'])


# 数据汇总
print("-"*10"串行测试""-"*10)
print("总触发次数:", len(serialColdStart) + len(serialHotStart))
print("冷启动次数:", len(serialColdStart))
print("热启动次数:", len(serialHotStart))
print("最大耗时量:", max(serialColdStart + serialHotStart))
print("最小耗时量:", min(serialColdStart + serialHotStart))
print("平均耗时量:", numpy.mean(serialColdStart + serialHotStart))

print("-"*10"并行测试""-"*10)
print("总触发次数:", len(parallelColdStart) + len(parallelHotStart))
print("冷启动次数:", len(parallelColdStart))
print("热启动次数:", len(parallelHotStart))
print("最大耗时量:", max(parallelColdStart + parallelHotStart))
print("最小耗时量:", min(parallelColdStart + parallelHotStart))
print("平均耗时量:", numpy.mean(parallelColdStart + parallelHotStart))

plt.figure(figsize=(15,10))
plt.subplot(421)
plt.title('(Serial) Cold Start Time')
plt.plot(range(0, len(serialColdStart)), serialColdStart)
plt.subplot(423)
plt.title('(Serial) Cold Start Time')
plt.hist(serialColdStart, bins=20)
plt.subplot(425)
plt.title('(Serial) Hot Start Time')
plt.plot(range(0, len(serialHotStart)), serialHotStart)
plt.subplot(427)
plt.title('(Serial) Hot Start Time')
plt.hist(serialHotStart, bins=20)
plt.subplot(422)
plt.title('(Parallel) Hot Start Time')
plt.plot(range(0, len(parallelColdStart)), parallelColdStart)
plt.subplot(424)
plt.title('(Parallel) Cold Start Time')
plt.hist(parallelColdStart, bins=20)
plt.subplot(426)
plt.title('(Parallel) Hot Start Time')
plt.plot(range(0, len(parallelHotStart)), parallelHotStart)
plt.subplot(428)
plt.title('(Parallel) Hot Start Time')
plt.hist(parallelHotStart, bins=20)
plt.show()import time, json
import urllib.request
import matplotlib.pyplot as plt
import numpy
from multiprocessing import Process, Manager


# 测试地址
# qcloud
url = "https://service-px5f98f4-1256773370.gz.apigw.tencentcs.com/release/scf_demo"
# # huaweicloud
# url = "https://2937587fe6ce4e6eb22d521d1d9b811c.apig.cn-east-2.huaweicloudapis.com/demo"
# # aliyun
# url = "https://50155512.cn-shanghai.fc.aliyuncs.com/2016-08-15/proxy/guide-hello_world/mydemo/"
# # aws
# url = "https://di6vbxf2lk.execute-api.us-east-1.amazonaws.com/default/mydemo"
# # GoogleCloud
# url = "https://us-central1-meta-imagery-277209.cloudfunctions.net/mydemo"

# 此时
times = 200

# 串行处理
serialColdStart = []
serialHotStart = []

for i in range(0,times):
    timeStart = time.time()
    responseAttr = urllib.request.urlopen(url)
    endTime = time.time()

    response = json.loads(responseAttr.read().decode("utf-8"))

    if response['isNew']:
        serialColdStart.append(endTime-timeStart)
    else:
        serialHotStart.append(endTime-timeStart)


# 并行处理
def worker(url, return_list):
    timeStart = time.time()
    responseAttr = urllib.request.urlopen(url)
    endTime = time.time()
    return_list.append({
        "duration": endTime-timeStart,
        "response": json.loads(responseAttr.read().decode(import ImageCaptcha


NUMBER = ['0''1''2''3''4''5''6''7''8''9']
LOW_CASE = ['a''b''c''d''e''f''g''h''i''j''k''l''m''n''o''p''q''r''s''t''u',
            'v''w''x''y''z']
UP_CASE = ['A''B''C''D''E''F''G''H''I''J''K''L''M''N''O''P''Q''R''S''T''U',
           'V''W''X''Y''Z']

CAPTCHA_LIST = NUMBER
CAPTCHA_LEN = 4         # 验证码长度
CAPTCHA_HEIGHT = 60     # 验证码高度
CAPTCHA_WIDTH = 160     # 验证码宽度


def random_captcha_text(char_set=CAPTCHA_LIST, captcha_size=CAPTCHA_LEN):
    """
    随机生成定长字符串
    :param char_set: 备选字符串列表
    :param captcha_size: 字符串长度
    :return: 字符串
    """

    captcha_text = [random.choice(char_set) for _ in range(captcha_size)]
    return ''.join(captcha_text)


def gen_captcha_text_and_image(width=CAPTCHA_WIDTH, height=CAPTCHA_HEIGHT, save=None):
    """
    生成随机验证码
    :param width: 验证码图片宽度
    :param height: 验证码图片高度
    :param save: 是否保存(None)
    :return: 验证码字符串,验证码图像np数组
    """

    image = ImageCaptcha(width=width, height=height)
    # 验证码文本
    captcha_text = random_captcha_text()
    captcha = image.generate(captcha_text)
    # 保存
    if save:
        image.write(captcha_text, './img/' + captcha_text + '.jpg')
    captcha_image = Image.open(captcha)
    # 转化为np数组
    captcha_image = np.array(captcha_image)
    return captcha_text, captcha_image


if __name__ == '__main__':
    t, im = gen_captcha_text_and_image(save=True)
    print(t, im.shape)      # (60, 160, 3)

这一部分主要用户生成验证码,目前CAPTCHA_LIST = NUMBER,表示只用数字验证码,如果需要英文大小写,可将LOW_CASEUP_CASE加到CAPTCHA_LIST中。

组件

# -*- coding:utf-8 -*-
# name: util.py

import numpy as np
from captcha_gen import gen_captcha_text_and_image
from captcha_gen import CAPTCHA_LIST, CAPTCHA_LEN, CAPTCHA_HEIGHT, CAPTCHA_WIDTH


def convert2gray(img):
    """
    图片转为黑白,3维转1维
    :param img: np
    :return:  灰度图的np
    """

    if len(img.shape) > 2:
      &n

作者简介:刘宇,毕业于浙江大学,硕士学历,目前在腾讯工作,著有《Serverless 架构》一书,是Serverless架构的热衷者,曾做一款叫Anycodes的软件,目前下载超过100万次。

返回列表

上一篇:未命名

下一篇:未命名

相关文章

Serverless架构中的无状态性指的是什么?

Serverless架构中的无状态性指的是什么?

前言接触Serverless架构的人,或者说接触函数计算的人,很多都会听过这样一句话:Serverless是无状态。众所周知,无状态就是没有状态的意思,也就是说我们没办法用它保存状态,因为用完即销毁。...

评论列表

tina
2020-05-29 14:05:42

很有帮助

爱好者
2020-04-15 01:25:49

挺不错的。

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。