文章目录
- aws(学习笔记第三十六课) lambda调用rekognition(名人视频分析AI)
- 学习内容:
- 1. 整体架构
- 1.1 全体处理架构
- 1.2 代码链接
- 1.3 代码修改
- 2. 代码分析
- 2.1 创建上传`video`的`S3 bucket`
- 2.2 创建`SNS topic`
- 2.3 创建`start_processing_lambda`需要的`role`
- 2.3.1 `AmazonSNSFullAccess`权限
- 2.3.2 `S3 bucket`权限
- 2.3.3 `rekognition`权限
- 2.3.4 `PassRole`权限
- 2.4 创建`start_rekognition_lambda`
- 2.5 创建`process_video_lambda`
- 2.6 赋予权限给`start_rekognition_lambda`和`process_video_lambda`
- 2.6.1 赋予权限给`start_rekognition_lambda`
- 2.6.2 赋予权限给`process_video_lambda`
- 2.7 `process_video_lambda`订阅`SNS event`
- 2.8 对`start_rekognition_lambda`加入`trigger`
- 2.9 `lambda`代码解析
- 2.9.1 `start_rekognition_lambda`代码
- 2.9.2 `process_video_lambda`代码
- 3.部署执行
- 3.1 测试`video`
- 3.2 开始部署
- 3.3 视频分析
- 3.3.1 上传视频
- 3.3.3 检查`cloudwatch`上的结果
- 3.3.4 cleanup stack
aws(学习笔记第三十六课) lambda调用rekognition(名人视频分析AI)
- 使用
lambda
调用rekognition
(名人视频分析)
学习内容:
- 使用
rekognition
(名人视频分析)
1. 整体架构
1.1 全体处理架构
1.2 代码链接
代码链接(rekognition-video-processor)
1.3 代码修改
在源代码中,进行结果输出都是采用print
语句,这导致cloudwatch
中不能正常进行输出。对lambda
的代码部分,做如下对应。
- 加入如下代码
import logginglogging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__)
- 将
print
的部分改成logger.info
2. 代码分析
2.1 创建上传video
的S3 bucket
# S3 bucket to store videosvideo_bucket = s3.Bucket(self,"S3Bucket",removal_policy=RemovalPolicy.DESTROY,)
2.2 创建SNS topic
# SNSrekognition_sns_topic = sns.Topic(self,"RekognitionSnsTopic",display_name="Rekognition Job Completion SNS Topic",)
这里,如果process complete
之后,通过SNS topic
通知结果,之后lambda
进行处理。
2.3 创建start_processing_lambda
需要的role
2.3.1 AmazonSNSFullAccess
权限
rekognition_role = iam.Role(self,"RekognitionRole",assumed_by=iam.ServicePrincipal("rekognition.amazonaws.com"),managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSNSFullAccess"),],)
因为要video
处理完成后要发布SNS event
,所以对start_processing_lambda
准备访问AmazonSNSFullAccess
的policy
。
2.3.2 S3 bucket
权限
# Define IAM permissions neededs3_lambda_policy = iam.PolicyStatement(actions=["s3:GetObject"],resources=[video_bucket.bucket_arn, video_bucket.bucket_arn + "/*"],effect=iam.Effect.ALLOW,)
这里需要访问S3 bucket
,所以需要S3 bucket
权限。
2.3.3 rekognition
权限
rekognition_lambda_policy = iam.PolicyStatement(actions=["rekognition:*"],resources=["*"],effect=iam.Effect.ALLOW,)
需要执行视频分析的rekognition
,所以赋予rekognition
权限。
2.3.4 PassRole
权限
pass_role_lambda_policy =iam.PolicyStatement(actions=["iam:PassRole"],resources=[rekognition_role.role_arn],)
这里,当video processing
结束的时候,需要publish event
到SNS
,所以需要PassRole
。
2.4 创建start_rekognition_lambda
# Lambda which detects when a video has been uploaded to the S3 bucket and starts the video processing with Rekognitionstart_processing_lambda_function = lambda_.Function(self,"LambdaFunction",function_name="start-processing-rekognition-demo-lambda",runtime=lambda_.Runtime.PYTHON_3_10,handler="index.lambda_handler",code=lambda_.Code.from_asset("lambdas/start_processing"),environment={"SNS_TOPIC_ARN": rekognition_sns_topic.topic_arn,"SNS_ROLE_ARN": rekognition_role.role_arn,},)
这里,environment
传递两个参数
- 一个是
SNS topic arn
,用来当process complete
的时候,发布SNS event
,通知process_video_lambda
进行处理。 - 另一个是
rekognition_role
的arn
。
2.5 创建process_video_lambda
# Lambda which detects when a video has been processed by reckognition. It stracts the data of each celebrity identifiedprocess_video_lambda = lambda_.Function(self,"RekognitionLambda",function_name="process-video-rekognition-demo-lambda",runtime=lambda_.Runtime.PYTHON_3_10,handler="index.lambda_handler",code=lambda_.Code.from_asset("lambdas/process_video"),)
这个lambda
不进行真正的video processing
,它就是进行start_rekognition_lambda
的结果输出。
2.6 赋予权限给start_rekognition_lambda
和process_video_lambda
2.6.1 赋予权限给start_rekognition_lambda
# Grant permissions to the lambdas definedstart_processing_lambda_function.add_to_role_policy(s3_lambda_policy)start_processing_lambda_function.add_to_role_policy(rekognition_lambda_policy)start_processing_lambda_function.add_to_role_policy(pass_role_lambda_policy)
2.6.2 赋予权限给process_video_lambda
process_video_lambda.add_to_role_policy(rekognition_lambda_policy)
2.7 process_video_lambda
订阅SNS event
rekognition_sns_topic.grant_publish(process_video_lambda)rekognition_sns_topic.add_subscription(sns_subs.LambdaSubscription(process_video_lambda))
这样,video_process_complete event
就能被process_video_lambda
检知到。
2.8 对start_rekognition_lambda
加入trigger
# Automatically trigger lambda when new image is uploaded to S3start_processing_lambda_function.add_event_source(aws_lambda_event_sources.S3EventSource(video_bucket, events=[s3.EventType.OBJECT_CREATED]))
特定的video_bucket
只要上传了mp4
文件,都会触发start_rekognition_lambda
。
2.9 lambda
代码解析
2.9.1 start_rekognition_lambda
代码
import boto3
import os
import logginglogging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)rekognition = boto3.client("rekognition")
s3 = boto3.client("s3")SNS_TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]
SNS_ROLE_ARN = os.environ["SNS_ROLE_ARN"]def lambda_handler(event, context):key = event["Records"][0]["s3"]["object"]["key"]bucket_name = event["Records"][0]["s3"]["bucket"]["name"]file_extension = os.path.splitext(key)[1]logger.info("key is " + key)logger.info("file_extension is " + file_extension)if file_extension == ".mp4":response = rekognition.start_celebrity_recognition(Video={"S3Object": {"Bucket": bucket_name, "Name": key}},NotificationChannel={"SNSTopicArn": SNS_TOPIC_ARN,"RoleArn": SNS_ROLE_ARN,},)logger.info("recognition has been started.")recognition = rekognition.get_celebrity_recognition(JobId=response["JobId"])return {"statusCode": 200, "body": "Rekognition job started!"}return {"statusCode": 200, "body": "No video uploaded"}
注意,这里,print
的地方都改成了logger.info
,因为cloudwatch
输出的场合,print
不起作用。
2.9.2 process_video_lambda
代码
import json
import boto3sns = boto3.client("sns")
rekognition = boto3.client("rekognition")
import logginglogging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)def lambda_handler(event, context):logger.info("received event.")for record in event["Records"]:# Get the Rekognition job status and job ID from the SNS messagesns_message = json.loads(record["Sns"]["Message"])rekognition_job_status = sns_message["Status"]logger.info("rekognition job status is " + rekognition_job_status )rekognition_job_id = sns_message["JobId"]if rekognition_job_status == "SUCCEEDED":try:# Get celebrity recognition results based on the job IDcelebrity_recognition_result = rekognition.get_celebrity_recognition(JobId=rekognition_job_id)# Process celebrity recognition resultscelebrities = celebrity_recognition_result["Celebrities"]for celebrity in celebrities:celebrity_name = celebrity["Celebrity"]["Name"]confidence = celebrity["Celebrity"]["Confidence"]logger.info(f"Celebrity Name: {celebrity_name}, Confidence: {confidence}%")except Exception as e:logger.info(f"Error processing celebrity recognition results: {str(e)}")elif rekognition_job_status == "FAILED":# Handle the case where the Rekognition job failedlogger.info(f"Rekognition job {rekognition_job_id} failed.")else:# Handle other job status or ignore it as neededlogger.info(f"Rekognition job {rekognition_job_id} is in status: {rekognition_job_status}")
3.部署执行
3.1 测试video
这里找了一个trump
的测试视频。
3.2 开始部署
cdk --require-approval never deploy
3.3 视频分析
3.3.1 上传视频
上传视频。–>测试视频。
3.3.3 检查cloudwatch
上的结果
这里,解析出了三个名人:
- Donald Trump, Confidence: 99.97700500488281%
- Eli Manning, Confidence: 99.83112335205078%
- Darrell Hammond, Confidence: 85.286651611328%
3.3.4 cleanup stack
- 删除
S3 bucket
的文件 clean stack
cdk destroy