当前位置: 首页> 健康> 母婴 > Redis的一个典型应用

Redis的一个典型应用

时间:2025/8/23 13:47:23来源:https://blog.csdn.net/twicave/article/details/140266657 浏览次数:0次

1.redis服务器与python编程环境

#install server

sudo apt update

sudo apt install redis-server

#install python api

pip install redis --timeout 200 -i http://mirrors.aliyun.com/pypi/simple/  --trusted-host mirrors.aliyun.com 

1.1 测试代码

# 创建Redis客户端实例
r = redis.Redis(host='localhost', port=6379, db=0)# 设置键值对
r.set('key', 'value')# 获取值
value = r.get('key')
print(value)  # 输出: b'value',b表示bytes类型# 删除键
r.delete('key')

2.接口封装1:命令行sh脚本的模式

使用redis缺省配置,就可以达到异常关机后,队列数据自动复原 

2.1 enqueue.sh

#!/bin/bash
# usage: enqueue key1 value1 5
# key1, push value1, the queue max length is 5.
# 陈旧数据会被抛弃# 设置 Redis 连接信息
REDIS_CLI="redis-cli"  # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost"          # Redis 服务器主机名或 IP
REDIS_PORT="6379"               # Redis 服务器端口号
KEY=$1                          # 要设置的 Redis 键名
VALUE=$2                        # 要设置的 Redis 值
MAX_LENGTH=$3                   # 最大长度# 检查列表当前长度
current_length=$(redis-cli -h $REDIS_HOST -p $REDIS_PORT LLEN $KEY)# 如果列表长度小于最大长度,则添加新元素
if [ $current_length -gt $MAX_LENGTH ]; then$REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT RPOP $KEY
fi# 使用 redis-cli 执行 SET 命令
$REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LPUSH $KEY $VALUE

2.2 fetch_top.sh

#!/bin/bash
#usage: fetch_top key1
#得到某个队列的最新值# 设置 Redis 连接信息
REDIS_CLI="redis-cli"  # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost"          # Redis 服务器主机名或 IP
REDIS_PORT="6379"               # Redis 服务器端口号
KEY="$1"                    # Redis 列表键名# 使用 redis-cli 执行 LINDEX 命令获取列表的首个元素
FIRST_ELEMENT=$($REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LINDEX $KEY 0)# 检查 LINDEX 命令执行结果
if [ -n "$FIRST_ELEMENT" ]; thenecho "$FIRST_ELEMENT"
elseecho ""
fi

 2.3 dequeue.sh 

#!/bin/bash
#将当前队列最新的元素丢弃# 设置 Redis 连接信息
REDIS_CLI="redis-cli"  # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost"          # Redis 服务器主机名或 IP
REDIS_PORT="6379"               # Redis 服务器端口号
KEY="$1"                    # Redis 列表键名# 使用 redis-cli 执行 LINDEX 命令获取列表的首个元素
FIRST_ELEMENT=$($REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LPOP $KEY)# 检查 LINDEX 命令执行结果
if [ -n "$FIRST_ELEMENT" ]; thenecho 0
elseecho 1
fi

3.接口封装2:python的格式,包含结构化对象的定义

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 获取当前脚本文件所在目录的父目录,并构建相对路径
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
project_path = os.path.join(current_dir, '..')
sys.path.append(project_path)
sys.path.append(current_dir)
import datetime
import paho.mqtt.client as mqtt
import cat4Config
import struct
import subprocess
import time
import json
import queue
import numpy as np
import threading
import subprocess
import pickle
import redisclass GpPublishMsg:def __init__(self, time, ar_class, pic, addr_web_api):self.time = timeself.ar_class = ar_class,self.pic = picself.addr_web_api = addr_web_apiclass GpPublicMsgQueue:def __init__(self):self.MAX_QUEUE_LEN = 3600self.errCnt = 0self.keyname = current_dir.replace('/', ':').replace('\\', ':').replace('.', ':')def enqueue(self, msg:GpPublishMsg):try:# 创建Redis客户端实例r = redis.Redis(host='localhost', port=6379, db=0)size = r.llen(self.keyname)if(size>self.MAX_QUEUE_LEN):r.rpop(self.keyname)r.lpush(self.keyname, pickle.dumps(msg))except:self.errCnt = self.errCnt+1def dequeue(self) -> GpPublishMsg:try:# 创建Redis客户端实例r = redis.Redis(host='localhost', port=6379, db=0)br = r.lpop(self.keyname)if(br is not None):return pickle.loads(br)else:return Noneexcept:self.errCnt = self.errCnt+1return Nonedef fetch_queue_top(self) -> GpPublishMsg:try:# 创建Redis客户端实例r = redis.Redis(host='localhost', port=6379, db=0)size = r.llen(self.keyname)if(size==0):return Nonebr = r.lindex(self.keyname, 0)if(br is not None):return pickle.loads(br)else:return Noneexcept:self.errCnt = self.errCnt+1return Nonedef test(self):item1 = GpPublishMsg(2024, 'class1', 'pic1', "http://1")item2 = GpPublishMsg(2024, 'class1', 'pic2', "http://1")item3 = GpPublishMsg(2024, 'class1', 'pic3', "http://1")dumb = self.fetch_queue_top()print('before first enqueue, top() = ', dumb)self.enqueue(item1)print('enqueue item1, top() = ', self.fetch_queue_top().pic)self.enqueue(item2)print('enqueue item2, top() = ', self.fetch_queue_top().pic)self.dequeue()print('first dequeue, top() = ', self.fetch_queue_top().pic)self.enqueue(item3)print('enqueue(item3), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top())

3.1 python封装测试

'''
>>> import ext_pic_out_offline_queue as gpqueue
>>> q = gpqueue.GpPublicMsgQueue()
>>> q.test()
before first enqueue, top() =  None
enqueue item1, top() =  pic1
enqueue item2, top() =  pic2
first dequeue, top() =  pic1
enqueue(item3), top() =  pic3
dequeue(), top() =  pic1
dequeue(), top() =  None
'''def test(self):item1 = GpPublishMsg(2024, 'class1', 'pic1', "http://1")item2 = GpPublishMsg(2024, 'class1', 'pic2', "http://1")item3 = GpPublishMsg(2024, 'class1', 'pic3', "http://1")dumb = self.fetch_queue_top()print('before first enqueue, top() = ', dumb)self.enqueue(item1)print('enqueue item1, top() = ', self.fetch_queue_top().pic)self.enqueue(item2)print('enqueue item2, top() = ', self.fetch_queue_top().pic)self.dequeue()print('first dequeue, top() = ', self.fetch_queue_top().pic)self.enqueue(item3)print('enqueue(item3), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top())

4.为什么要用redis

在编程模式上,订阅模式在分布式环境的一个实现形式,就是mqtt。它可以非常方便地处理消息分发。甚至可以将这个应用添加进嵌入式系统——这个分布式协同机制的开销极小。但是mqtt不负责持久化。顶多,它会保留同一个topic下的最后一笔数据,它不进行数据保存,只是把信息分发给当前在线的用户。

如果你需要一个存储缓冲的机制,那么redis就是非常适合的选择。

你可以手工实现,但是它有代价——你无法方便地调试,然后你会遇到重复发明轮子过程中必然遭遇的各种技术细节。

关键字:Redis的一个典型应用

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: