运行测试命令是 python main.py

注意对于我们的macos linux 需要明确的指定我们的版本讷,这里进行区分一下吧

Python 数据类型

  • 获取数据类型的方式

    • type()

      • 实现返回的数据格式为 <class 'str'> <class 'int'>

    • isinstance()

      • 判断的是我们的什么是什么的实例来进行讨论的吧

  • 基础数据类型

    • 整数类型 int

    • 浮点数类型: float

    • 字符串类型: str

  • 标准的数据类型有

    • Nunber 数字

    • String 字符串

    • bool 布尔类型

    • List 列表

    • Tuple 元组类型

    • Set 集合类型

    • Dictionary 字典类型

    • Bytes 不可变的二进制数据吧

  • 六个基本的数据类型的分类

    • 基础数据类型(不可变类型): Number String Tuple bool

    • 可变数据类型(引用数据类型):List Set Dictionary

对比实现:就是我们的 python 的话和 javascript 是一样的内存管理模型吧

但是也是具备差异化的讷

  • 就是我们的javascript的字符串的操作的话是值类型的,进行修改后的操作:拼接,+ 等操作,还是一个内存地址的引用

  • 但是 python 的 string 是一个引用的,不可变得,此时修改后对应的内存地址也会发生变化讷

  • 所以说 python 的数据类型的分类的话核心是进行的是可变和不可变的区分吧

#! /usr/bin/python3

# 实现书写我们的python 的命令行脚本吧
print(isinstance("", str)) # True
print(type("")) # <class 'str'>

Python 运算符

  • 位运算符

  • 逻辑运算符

  • 成员运算符

  • 成员运算符

Python 数字

Python 字符串

序号

方法及描述

1

capitalize()
将字符串的第一个字符转换为大写

2

center(width, fillchar)

返回一个指定的宽度 width 居中的字符串,fillchar 为填充的字符,默认为空格。

3

count(str, beg= 0,end=len(string))


返回 str 在 string 里面出现的次数,如果 beg 或者 end 指定则返回指定范围内 str 出现的次数

4

bytes.decode(encoding="utf-8", errors="strict")


Python3 中没有 decode 方法,但我们可以使用 bytes 对象的 decode() 方法来解码给定的 bytes 对象,这个 bytes 对象可以由 str.encode() 来编码返回。

5

encode(encoding='UTF-8',errors='strict')


以 encoding 指定的编码格式编码字符串,如果出错默认报一个ValueError 的异常,除非 errors 指定的是'ignore'或者'replace'

6

endswith(suffix, beg=0, end=len(string))
检查字符串是否以 suffix 结束,如果 beg 或者 end 指定则检查指定的范围内是否以 suffix 结束,如果是,返回 True,否则返回 False。

7

expandtabs(tabsize=8)


把字符串 string 中的 tab 符号转为空格,tab 符号默认的空格数是 8 。

8

find(str, beg=0, end=len(string))


检测 str 是否包含在字符串中,如果指定范围 beg 和 end ,则检查是否包含在指定范围内,如果包含返回开始的索引值,否则返回-1

9

index(str, beg=0, end=len(string))


跟find()方法一样,只不过如果str不在字符串中会报一个异常。

10

isalnum()


检查字符串是否由字母和数字组成,即字符串中的所有字符都是字母或数字。如果字符串至少有一个字符,并且所有字符都是字母或数字,则返回 True;否则返回 False。

11

isalpha()


如果字符串至少有一个字符并且所有字符都是字母或中文字则返回 True, 否则返回 False

12

isdigit()


如果字符串只包含数字则返回 True 否则返回 False..

13

islower()


如果字符串中包含至少一个区分大小写的字符,并且所有这些(区分大小写的)字符都是小写,则返回 True,否则返回 False

14

isnumeric()


如果字符串中只包含数字字符,则返回 True,否则返回 False

15

isspace()


如果字符串中只包含空白,则返回 True,否则返回 False.

16

istitle()


如果字符串是标题化的(见 title())则返回 True,否则返回 False

17

isupper()


如果字符串中包含至少一个区分大小写的字符,并且所有这些(区分大小写的)字符都是大写,则返回 True,否则返回 False

18

join(seq)


以指定字符串作为分隔符,将 seq 中所有的元素(的字符串表示)合并为一个新的字符串

19

len(string)


返回字符串长度

20

ljust(width[, fillchar])


返回一个原字符串左对齐,并使用 fillchar 填充至长度 width 的新字符串,fillchar 默认为空格。

21

lower()


转换字符串中所有大写字符为小写.

22

lstrip()


截掉字符串左边的空格或指定字符。

23

maketrans()


创建字符映射的转换表,对于接受两个参数的最简单的调用方式,第一个参数是字符串,表示需要转换的字符,第二个参数也是字符串表示转换的目标。

24

max(str)


返回字符串 str 中最大的字母。

25

min(str)


返回字符串 str 中最小的字母。

26

replace(old, new [, max])


把 将字符串中的 old 替换成 new,如果 max 指定,则替换不超过 max 次。

27

rfind(str, beg=0,end=len(string))


类似于 find()函数,不过是从右边开始查找.

28

rindex( str, beg=0, end=len(string))


类似于 index(),不过是从右边开始.

29

rjust(width,[, fillchar])


返回一个原字符串右对齐,并使用fillchar(默认空格)填充至长度 width 的新字符串

30

rstrip()


删除字符串末尾的空格或指定字符。

31

split(str="", num=string.count(str))


以 str 为分隔符截取字符串,如果 num 有指定值,则仅截取 num+1 个子字符串

32

splitlines([keepends])


按照行('\r', '\r\n', \n')分隔,返回一个包含各行作为元素的列表,如果参数 keepends 为 False,不包含换行符,如果为 True,则保留换行符。

33

startswith(substr, beg=0,end=len(string))


检查字符串是否是以指定子字符串 substr 开头,是则返回 True,否则返回 False。如果beg 和 end 指定值,则在指定范围内检查。

34

strip([chars])


在字符串上执行 lstrip()和 rstrip()

35

swapcase()


将字符串中大写转换为小写,小写转换为大写

36

title()


返回"标题化"的字符串,就是说所有单词都是以大写开始,其余字母均为小写(见 istitle())

37

translate(table, deletechars="")


根据 table 给出的表(包含 256 个字符)转换 string 的字符, 要过滤掉的字符放到 deletechars 参数中

38

upper()


转换字符串中的小写字母为大写

39

zfill (width)


返回长度为 width 的字符串,原字符串右对齐,前面填充0

40

isdecimal()


检查字符串是否只包含十进制字符,如果是返回 true,否则返回 false。

Python List列表

序号

函数

1

len(list)
列表元素个数

2

max(list)
返回列表元素最大值

3

min(list)
返回列表元素最小值

4

list(seq)
将元组转换为列表

序号

方法

1

list.append(obj)
在列表末尾添加新的对象

2

list.count(obj)
统计某个元素在列表中出现的次数

3

list.extend(seq)
在列表末尾一次性追加另一个序列中的多个值(用新列表扩展原来的列表)

4

list.index(obj)
从列表中找出某个值第一个匹配项的索引位置

5

list.insert(index, obj)
将对象插入列表

6

list.pop([index=-1])
移除列表中的一个元素(默认最后一个元素),并且返回该元素的值

7

list.remove(obj)
移除列表中某个值的第一个匹配项

8

list.reverse()
反向列表中元素

9

list.sort( key=None, reverse=False)
对原列表进行排序

10

list.clear()
清空列表

11

list.copy()
复制列表

Python 元组

  • 元组是一个不可变的数据类型,所以说只是提供了对应的默认的属性操作吧

序号

方法及描述

实例

1

len(tuple)
计算元组元素个数。

>>> tuple1 = ('Google', 'Runoob', 'Taobao')
>>> len(tuple1)
3
>>> 

2

max(tuple)
返回元组中元素最大值。

>>> tuple2 = ('5', '4', '8')
>>> max(tuple2)
'8'
>>> 

3

min(tuple)
返回元组中元素最小值。

>>> tuple2 = ('5', '4', '8')
>>> min(tuple2)
'4'
>>> 

4

tuple(iterable)
将可迭代系列转换为元组。

>>> list1= ['Google', 'Taobao', 'Runoob', 'Baidu']
>>> tuple1=tuple(list1)
>>> tuple1
('Google', 'Taobao', 'Runoob', 'Baidu')

Python 字典

序号

函数及描述

实例

1

len(dict)
计算字典元素个数,即键的总数。

>>> tinydict = {'Name': 'Runoob', 'Age': 7, 'Class': 'First'}
>>> len(tinydict)
3

2

str(dict)
输出字典,可以打印的字符串表示。

>>> tinydict = {'Name': 'Runoob', 'Age': 7, 'Class': 'First'}
>>> str(tinydict)
"{'Name': 'Runoob', 'Class': 'First', 'Age': 7}"

3

type(variable)
返回输入的变量类型,如果变量是字典就返回字典类型。

>>> tinydict = {'Name': 'Runoob', 'Age': 7, 'Class': 'First'}
>>> type(tinydict)
<class 'dict'>

序号

函数及描述

1

dict.clear()
删除字典内所有元素

2

dict.copy()
返回一个字典的浅复制

3

dict.fromkeys()
创建一个新字典,以序列seq中元素做字典的键,val为字典所有键对应的初始值

4

dict.get(key, default=None)
返回指定键的值,如果键不在字典中返回 default 设置的默认值

5

key in dict
如果键在字典dict里返回true,否则返回false

6

dict.items()
以列表返回一个视图对象

7

dict.keys()
返回一个视图对象

8

dict.setdefault(key, default=None)
和get()类似, 但如果键不存在于字典中,将会添加键并将值设为default

9

dict.update(dict2)
把字典dict2的键/值对更新到dict里

10

dict.values()
返回一个视图对象

11

dict.pop(key[,default])
删除字典 key(键)所对应的值,返回被删除的值。

12

dict.popitem()
返回并删除字典中的最后一对键和值。

Python 推导式

  • 实现的就是我们的过滤掉一些长度小于等于3的一些 element ,其他的直接进行转化为大写的操作吧

>>> names = ['Bob','Tom','alice','Jerry','Wendy','Smith']
>>> new_names = [name.upper()for name in names if len(name)>3]
>>> print(new_names)
['ALICE', 'JERRY', 'WENDY', 'SMITH']
  • 得到0-30 的整除3的数组吧

>>> multiples = [i for i in range(30) if i % 3 == 0]
>>> print(multiples)
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
listdemo = ['Google','Runoob', 'Taobao']
# 将列表中各字符串值为键,各字符串的长度为值,组成键值对
>>> newdict = {key:len(key) for key in listdemo}
>>> newdict
{'Google': 6, 'Runoob': 6, 'Taobao': 6}

Python 迭代器和生成器

  • 核心的 api

    • iter()

    • __iter__()

    • next()

    • __next__()

  • 好处

    • 节省内存:不需要一次性加载所有数据,而是按需生成或获取。

    • 性能优化:可以在需要的时候才进行计算,避免不必要的计算。

    • 可中断性:可以在任何点停止,比如我们可以在遍历到某个条件时提前终止,而不必处理整个数据集。

def infinite_sequence():
    num = 0
    while True:
        yield num
        num += 1

# 可以无限迭代,但通常不会真的无限迭代,会设置终止条件
for i in infinite_sequence():
    if i > 100:
        break
    print(i)

def chunk_reader(file_path, chunk_size=1024):
    """分块读取文件"""
    with open(file_path, 'rb') as file:
        while True:
            chunk = file.read(chunk_size)
            if not chunk:
                break
            yield chunk

# 使用示例
for chunk in chunk_reader('large_binary_file.bin', 8192):
    process_chunk(chunk)

数据库查询优化

import psycopg2
from psycopg2 import pool
from contextlib import contextmanager
import json

class DatabaseStreamProcessor:
    def __init__(self, db_config, pool_size=10):
        self.db_pool = psycopg2.pool.SimpleConnectionPool(
            1, pool_size, **db_config
        )
    
    @contextmanager
    def get_cursor(self):
        conn = self.db_pool.getconn()
        try:
            yield conn.cursor()
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            self.db_pool.putconn(conn)
    
    def stream_large_query(self, query, params=None, chunk_size=1000):
        """流式处理大型查询结果,避免内存溢出"""
        with self.get_cursor() as cursor:
            cursor.execute(query, params or ())
            
            while True:
                rows = cursor.fetchmany(chunk_size)
                if not rows:
                    break
                
                # 对每一批数据进行处理
                processed_chunk = self._process_rows(rows)
                yield from processed_chunk
    
    def _process_rows(self, rows):
        """处理单批数据,可以自定义业务逻辑"""
        for row in rows:
            # 模拟复杂的数据转换逻辑
            processed = {
                'id': row[0],
                'data': json.loads(row[1]) if row[1] else {},
                'timestamp': row[2].isoformat() if row[2] else None,
                'processed_at': datetime.now().isoformat()
            }
            yield processed
    
    def batch_update_generator(self, data_stream, batch_size=500):
        """将数据流分批用于批量更新"""
        batch = []
        for item in data_stream:
            batch.append(item)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        
        if batch:  # 处理最后一批
            yield batch
    
    def process_user_analytics(self, start_date, end_date):
        """完整的用户分析数据处理管道"""
        # 1. 流式读取用户行为数据
        query = """
            SELECT user_id, activity_data, created_at 
            FROM user_activities 
            WHERE created_at BETWEEN %s AND %s
            ORDER BY created_at
        """
        
        raw_stream = self.stream_large_query(query, (start_date, end_date))
        
        # 2. 数据清洗和过滤
        cleaned_stream = (
            item for item in raw_stream 
            if item['data'].get('type') in ['purchase', 'view', 'click']
        )
        
        # 3. 数据聚合(按用户分组)
        from collections import defaultdict
        user_aggregates = defaultdict(lambda: {
            'purchase_count': 0,
            'view_count': 0,
            'click_count': 0,
            'total_value': 0.0
        })
        
        for item in cleaned_stream:
            user_id = item['id']
            activity_type = item['data'].get('type')
            value = item['data'].get('value', 0)
            
            if activity_type == 'purchase':
                user_aggregates[user_id]['purchase_count'] += 1
                user_aggregates[user_id]['total_value'] += float(value)
            elif activity_type == 'view':
                user_aggregates[user_id]['view_count'] += 1
            elif activity_type == 'click':
                user_aggregates[user_id]['click_count'] += 1
        
        # 4. 生成最终分析结果
        for user_id, aggregates in user_aggregates.items():
            yield {
                'user_id': user_id,
                **aggregates,
                'conversion_rate': (
                    aggregates['purchase_count'] / aggregates['view_count'] 
                    if aggregates['view_count'] > 0 else 0
                )
            }

# 使用示例
db_config = {
    'host': 'localhost',
    'database': 'analytics',
    'user': 'user',
    'password': 'password'
}

processor = DatabaseStreamProcessor(db_config)

# 处理大量用户数据
for user_stats in processor.process_user_analytics(
    start_date='2024-01-01', 
    end_date='2024-01-31'
):
    # 可以实时发送到消息队列或写入另一个数据库
    print(f"User {user_stats['user_id']}: {user_stats}")

服务器中间管道

import asyncio
from contextvars import ContextVar
from typing import Any, Callable, Awaitable, Generator

# 上下文变量,用于在中间件间传递数据
request_id_var = ContextVar('request_id', default=None)

class AsyncMiddlewarePipeline:
    def __init__(self):
        self.middlewares = []
    
    def add_middleware(self, middleware_func: Callable):
        self.middlewares.append(middleware_func)
    
    async def process_request(self, request: dict) -> dict:
        """处理HTTP请求的中间件管道"""
        # 创建处理管道
        pipeline = self._build_pipeline(request)
        
        # 执行管道
        response = await pipeline.__anext__()
        
        # 继续执行直到完成
        async for _ in pipeline:
            pass
            
        return response
    
    async def _build_pipeline(self, request: dict) -> Generator:
        """构建中间件处理管道"""
        # 初始响应
        response = {'status': 200, 'headers': {}, 'body': ''}
        
        # 按顺序执行中间件
        for middleware in self.middlewares:
            # 每个中间件都是一个异步生成器
            middleware_gen = middleware(request, response)
            try:
                # 执行中间件直到第一个 yield
                await middleware_gen.__anext__()
                
                # 中间件可以在这里修改request或response
                yield response
                
                # 继续执行中间件的剩余部分(响应处理)
                try:
                    await middleware_gen.__anext__()
                except StopAsyncIteration:
                    pass
                    
            except Exception as e:
                print(f"Middleware error: {e}")
                break

# 具体的中间件实现
async def auth_middleware(request: dict, response: dict):
    """认证中间件"""
    # 请求处理阶段
    token = request.get('headers', {}).get('authorization')
    if not token or not await validate_token(token):
        response['status'] = 401
        response['body'] = 'Unauthorized'
        return
    
    user_id = await get_user_id_from_token(token)
    request['user_id'] = user_id
    
    # 让出控制权,继续执行下一个中间件
    yield
    
    # 响应处理阶段(可以在这里添加响应头等)
    response['headers']['x-user-id'] = str(user_id)

async def logging_middleware(request: dict, response: dict):
    """日志记录中间件"""
    import uuid
    request_id = str(uuid.uuid4())
    request_id_var.set(request_id)
    
    # 记录请求开始
    print(f"[{request_id}] Started {request['method']} {request['path']}")
    
    start_time = asyncio.get_event_loop().time()
    
    yield  # 执行其他中间件和业务逻辑
    
    # 记录请求完成
    duration = asyncio.get_event_loop().time() - start_time
    print(f"[{request_id}] Completed in {duration:.3f}s - {response['status']}")

async def rate_limit_middleware(request: dict, response: dict):
    """限流中间件"""
    client_ip = request.get('client_ip')
    
    if await is_rate_limited(client_ip):
        response['status'] = 429
        response['body'] = 'Too Many Requests'
        return
    
    yield
    
    # 更新限流计数
    await update_rate_limit(client_ip)

# 模拟的辅助函数
async def validate_token(token): return True
async def get_user_id_from_token(token): return 123
async def is_rate_limited(ip): return False
async def update_rate_limit(ip): pass

# 使用示例
async def main():
    pipeline = AsyncMiddlewarePipeline()
    pipeline.add_middleware(logging_middleware)
    pipeline.add_middleware(auth_middleware)
    pipeline.add_middleware(rate_limit_middleware)
    
    # 模拟HTTP请求
    request = {
        'method': 'GET',
        'path': '/api/data',
        'headers': {'authorization': 'Bearer token123'},
        'client_ip': '192.168.1.1'
    }
    
    response = await pipeline.process_request(request)
    print(f"Final response: {response}")

# asyncio.run(main())

监控收集

import time
import random
from collections import defaultdict, deque
from datetime import datetime, timedelta

class MetricsAggregator:
    def __init__(self, window_size=300):  # 5分钟窗口
        self.window_size = window_size
        self.metrics_buffer = deque(maxlen=window_size)
    
    def metrics_generator(self, interval=1):
        """生成模拟的监控指标数据流"""
        while True:
            timestamp = datetime.now()
            
            # 模拟多种监控指标
            metrics = {
                'timestamp': timestamp,
                'cpu_percent': random.uniform(0, 100),
                'memory_used': random.randint(1000, 8000),
                'disk_io': random.randint(0, 1000),
                'network_rx': random.randint(0, 1000000),
                'network_tx': random.randint(0, 1000000),
                'active_connections': random.randint(0, 10000)
            }
            
            yield metrics
            time.sleep(interval)
    
    def sliding_window_aggregator(self, metrics_stream, window_seconds=60):
        """滑动窗口聚合器"""
        window = deque()
        
        for metric in metrics_stream:
            current_time = metric['timestamp']
            
            # 移除过期的数据点
            while (window and 
                  (current_time - window[0]['timestamp']).total_seconds() > window_seconds):
                window.popleft()
            
            window.append(metric)
            
            if len(window) > 0:
                # 计算窗口内的聚合指标
                aggregated = self._aggregate_window(list(window))
                yield aggregated
    
    def _aggregate_window(self, window_data):
        """聚合窗口内的数据"""
        if not window_data:
            return {}
        
        aggregated = {
            'window_start': window_data[0]['timestamp'],
            'window_end': window_data[-1]['timestamp'],
            'sample_count': len(window_data),
            'cpu_avg': sum(m['cpu_percent'] for m in window_data) / len(window_data),
            'cpu_max': max(m['cpu_percent'] for m in window_data),
            'memory_avg': sum(m['memory_used'] for m in window_data) / len(window_data),
            'disk_io_avg': sum(m['disk_io'] for m in window_data) / len(window_data),
            'network_rx_total': sum(m['network_rx'] for m in window_data),
            'network_tx_total': sum(m['network_tx'] for m in window_data),
            'connections_avg': sum(m['active_connections'] for m in window_data) / len(window_data)
        }
        
        # 检测异常(简单的阈值检测)
        aggregated['anomalies'] = self._detect_anomalies(aggregated)
        
        return aggregated
    
    def _detect_anomalies(self, aggregated):
        """检测指标异常"""
        anomalies = []
        
        if aggregated['cpu_avg'] > 80:
            anomalies.append('high_cpu')
        if aggregated['cpu_max'] > 95:
            anomalies.append('cpu_spike')
        if aggregated['memory_avg'] > 7000:
            anomalies.append('high_memory')
        if aggregated['connections_avg'] > 8000:
            anomalies.append('high_connections')
        
        return anomalies
    
    def multi_level_aggregation(self, raw_metrics_stream):
        """多级聚合:原始数据 -> 1秒窗口 -> 1分钟窗口 -> 5分钟窗口"""
        
        # 第一级:1秒窗口聚合
        one_sec_aggregated = self.sliding_window_aggregator(
            raw_metrics_stream, window_seconds=1
        )
        
        # 第二级:1分钟窗口聚合(基于1秒聚合结果)
        one_min_aggregated = self.sliding_window_aggregator(
            one_sec_aggregated, window_seconds=60
        )
        
        # 第三级:5分钟窗口聚合(基于1分钟聚合结果)
        five_min_aggregated = self.sliding_window_aggregator(
            one_min_aggregated, window_seconds=300
        )
        
        return {
            'raw': raw_metrics_stream,
            '1s': one_sec_aggregated,
            '1m': one_min_aggregated,
            '5m': five_min_aggregated
        }

# 使用示例
def monitor_system():
    aggregator = MetricsAggregator()
    
    # 生成模拟监控数据流
    raw_metrics = aggregator.metrics_generator(interval=0.1)
    
    # 获取多级聚合数据流
    aggregation_streams = aggregator.multi_level_aggregation(raw_metrics)
    
    # 处理不同粒度的数据
    import itertools
    
    for i, (one_sec, one_min, five_min) in enumerate(zip(
        aggregation_streams['1s'],
        aggregation_streams['1m'], 
        aggregation_streams['5m']
    )):
        print(f"\n--- Sample {i} ---")
        print(f"1s聚合: CPU={one_sec['cpu_avg']:.1f}%")
        print(f"1m聚合: CPU={one_min['cpu_avg']:.1f}%, 异常={one_min['anomalies']}")
        print(f"5m聚合: CPU={five_min['cpu_avg']:.1f}%")
        
        if i >= 10:  # 只显示前10个样本
            break

# monitor_system()

分布式协调

import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any, Generator

class DistributedTaskCollector:
    def __init__(self, worker_nodes: List[str], max_workers=10):
        self.worker_nodes = worker_nodes
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.max_workers = max_workers
        
    def submit_tasks(self, tasks: List[Dict]):
        """提交任务到队列"""
        for task in tasks:
            self.task_queue.put(task)
        
        # 添加结束标记
        for _ in range(len(self.worker_nodes)):
            self.task_queue.put(None)
    
    def worker(self, node_id: str) -> Generator[Dict, None, None]:
        """工作节点模拟,返回结果生成器"""
        while True:
            try:
                task = self.task_queue.get(timeout=1)
                if task is None:
                    break
                
                # 模拟任务处理
                result = self._process_task(task, node_id)
                yield result
                
            except queue.Empty:
                continue
    
    def _process_task(self, task: Dict, node_id: str) -> Dict:
        """模拟任务处理"""
        # 模拟处理时间
        processing_time = random.uniform(0.1, 2.0)
        time.sleep(processing_time)
        
        # 模拟任务结果
        return {
            'task_id': task['id'],
            'node_id': node_id,
            'result': f"Processed {task['data']}",
            'processing_time': processing_time,
            'success': random.random() > 0.1,  # 90%成功率
            'timestamp': datetime.now()
        }
    
    def result_collector(self) -> Generator[Dict, None, None]:
        """收集所有工作节点的结果"""
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 启动所有工作节点
            future_to_node = {
                executor.submit(self._run_worker, node): node
                for node in self.worker_nodes
            }
            
            # 收集结果
            completed = 0
            while completed < len(self.worker_nodes):
                for future in future_to_node:
                    if future.done():
                        try:
                            # 获取工作节点的所有结果
                            for result in future.result():
                                yield result
                        except Exception as e:
                            yield {
                                'error': str(e),
                                'node_id': future_to_node[future],
                                'timestamp': datetime.now()
                            }
                        completed += 1
                        break
                time.sleep(0.1)
    
    def _run_worker(self, node_id: str) -> List[Dict]:
        """运行工作节点并收集其所有结果"""
        return list(self.worker(node_id))
    
    def real_time_aggregator(self, result_stream, window_size=100):
        """实时聚合任务执行结果"""
        results_window = deque(maxlen=window_size)
        stats = {
            'total_processed': 0,
            'success_count': 0,
            'failure_count': 0,
            'total_processing_time': 0.0,
            'node_stats': defaultdict(lambda: {'processed': 0, 'success': 0})
        }
        
        for result in result_stream:
            if 'error' in result:
                stats['failure_count'] += 1
                continue
                
            results_window.append(result)
            stats['total_processed'] += 1
            
            if result['success']:
                stats['success_count'] += 1
            else:
                stats['failure_count'] += 1
                
            stats['total_processing_time'] += result['processing_time']
            
            # 更新节点统计
            node_id = result['node_id']
            stats['node_stats'][node_id]['processed'] += 1
            if result['success']:
                stats['node_stats'][node_id]['success'] += 1
            
            # 定期输出统计信息
            if stats['total_processed'] % 10 == 0:
                yield self._compute_real_time_stats(stats, list(results_window))
        
        # 最终统计
        yield self._compute_real_time_stats(stats, list(results_window))
    
    def _compute_real_time_stats(self, stats, recent_results):
        """计算实时统计信息"""
        if stats['total_processed'] == 0:
            return {}
        
        avg_processing_time = (
            stats['total_processing_time'] / stats['total_processed']
        )
        
        success_rate = (
            stats['success_count'] / stats['total_processed'] * 100
        )
        
        # 计算最近结果的趋势
        recent_success_rate = 0
        if recent_results:
            recent_success = sum(1 for r in recent_results if r.get('success'))
            recent_success_rate = recent_success / len(recent_results) * 100
        
        return {
            'timestamp': datetime.now(),
            'total_processed': stats['total_processed'],
            'success_rate': success_rate,
            'recent_success_rate': recent_success_rate,
            'avg_processing_time': avg_processing_time,
            'node_performance': {
                node: {
                    'processed': data['processed'],
                    'success_rate': (
                        data['success'] / data['processed'] * 100 
                        if data['processed'] > 0 else 0
                    )
                }
                for node, data in stats['node_stats'].items()
            }
        }

# 使用示例
def run_distributed_workload():
    # 创建任务收集器
    worker_nodes = [f'worker-{i}' for i in range(5)]
    collector = DistributedTaskCollector(worker_nodes)
    
    # 创建测试任务
    tasks = [{'id': i, 'data': f'task_data_{i}'} for i in range(50)]
    collector.submit_tasks(tasks)
    
    # 收集并实时分析结果
    result_stream = collector.result_collector()
    stats_stream = collector.real_time_aggregator(result_stream)
    
    print("开始处理分布式任务...")
    for stats in stats_stream:
        print(f"\n实时统计:")
        print(f"  总处理数: {stats['total_processed']}")
        print(f"  成功率: {stats['success_rate']:.1f}%")
        print(f"  最近成功率: {stats['recent_success_rate']:.1f}%")
        print(f"  平均处理时间: {stats['avg_processing_time']:.3f}s")
        
        print("  节点性能:")
        for node, perf in stats['node_performance'].items():
            print(f"    {node}: {perf['processed']} tasks, "
                  f"{perf['success_rate']:.1f}% success")

# run_distributed_workload()

Python with 语句

  • 基本概念是

    • with 语句进行的是包装一个代码块的执行,允许定义初始化和清理操作吧,通常用于资源管理

  • 上下文协议

    • __enter__(self) 进入 with 语句的时候实现调用的语句吧,返回的是一个对象讷,该对象一般为了方便操作 as 赋值给对应的变量

    • __exit__(self, exc_type, exc_value, tracebakc) 离开的时候实现的对应的执行函数吧,异常类型或者说异常值的跟踪吧

  • 工作原理

    • 调用上下文管理器 __enter__

    • 如果指定了 as 子句,直接进行返回值赋值的变量吧

    • 执行 with 语句代码块

    • 无论是否发生异常都会执行 __exit__ ,返回 True 就不会进行传播异常吧,否则就是有异常讷

from contextlib import contextmanager
import time
import psutil
import threading

@contextmanager
def resource_monitor(description="操作"):
    """资源监控上下文管理器"""
    process = psutil.Process()
    
    # 初始资源使用
    start_memory = process.memory_info().rss / 1024 / 1024  # MB
    start_cpu = process.cpu_percent()
    start_time = time.time()
    
    # 监控线程
    stop_monitoring = threading.Event()
    peak_memory = start_memory
    
    def monitor_resources():
        nonlocal peak_memory
        while not stop_monitoring.is_set():
            current_memory = process.memory_info().rss / 1024 / 1024
            peak_memory = max(peak_memory, current_memory)
            time.sleep(0.1)
    
    monitor_thread = threading.Thread(target=monitor_resources)
    monitor_thread.start()
    
    try:
        yield
    finally:
        stop_monitoring.set()
        monitor_thread.join()
        
        # 计算最终资源使用
        end_time = time.time()
        end_memory = process.memory_info().rss / 1024 / 1024
        end_cpu = process.cpu_percent()
        
        print(f"\n=== {description} 资源报告 ===")
        print(f"执行时间: {end_time - start_time:.3f}秒")
        print(f"内存使用: {start_memory:.1f}MB -> {end_memory:.1f}MB")
        print(f"内存峰值: {peak_memory:.1f}MB")
        print(f"CPU使用: {end_cpu:.1f}%")

@contextmanager
def query_logger(slow_query_threshold=1.0):
    """数据库查询日志上下文管理器"""
    queries = []
    start_time = time.time()
    
    def log_query(query, params=None, duration=0):
        queries.append({
            'query': query,
            'params': params,
            'duration': duration,
            'slow': duration > slow_query_threshold
        })
    
    try:
        yield log_query
    finally:
        end_time = time.time()
        total_time = end_time - start_time
        
        print(f"\n=== 查询执行报告 ===")
        print(f"总查询数: {len(queries)}")
        print(f"总执行时间: {total_time:.3f}秒")
        print(f"慢查询数: {sum(1 for q in queries if q['slow'])}")
        
        for i, query in enumerate(queries, 1):
            status = "🚨 慢查询" if query['slow'] else "✅ 正常"
            print(f"{i}. [{status}] {query['duration']:.3f}s - {query['query']}")

# 使用示例
def process_large_dataset():
    with resource_monitor("大数据处理"):
        data = []
        for i in range(100000):
            data.append({"id": i, "value": i ** 2})
        
        # 模拟耗时操作
        time.sleep(2)
        
        result = [item for item in data if item["id"] % 2 == 0]
        return result

def mock_database_operations():
    with query_logger(slow_query_threshold=0.5) as log_query:
        # 模拟查询
        queries = [
            ("SELECT * FROM users WHERE id = %s", (1,), 0.1),
            ("SELECT * FROM orders WHERE user_id = %s", (1,), 0.3),
            ("SELECT COUNT(*) FROM large_table", None, 1.2),  # 慢查询
            ("UPDATE products SET price = price * 1.1", None, 0.8),
        ]
        
        for query, params, duration in queries:
            time.sleep(duration)  # 模拟查询时间
            log_query(query, params, duration)

# process_large_dataset()
# mock_database_operations()
import tempfile
import shutil
from contextlib import contextmanager
import os

@contextmanager
def temporary_directory(prefix="tmp_"):
    """临时目录上下文管理器"""
    temp_dir = tempfile.mkdtemp(prefix=prefix)
    try:
        print(f"创建临时目录: {temp_dir}")
        yield temp_dir
    finally:
        print(f"清理临时目录: {temp_dir}")
        shutil.rmtree(temp_dir)

@contextmanager
def change_directory(path):
    """临时切换工作目录"""
    original_cwd = os.getcwd()
    try:
        os.chdir(path)
        print(f"切换到目录: {path}")
        yield
    finally:
        os.chdir(original_cwd)
        print(f"切回目录: {original_cwd}")

@contextmanager
def environment_variables(**kwargs):
    """临时设置环境变量"""
    original_env = {}
    try:
        # 保存原始值并设置新值
        for key, value in kwargs.items():
            original_env[key] = os.environ.get(key)
            os.environ[key] = str(value)
            print(f"设置环境变量: {key}={value}")
        yield
    finally:
        # 恢复原始值
        for key, original_value in original_env.items():
            if original_value is None:
                os.environ.pop(key, None)
                print(f"删除环境变量: {key}")
            else:
                os.environ[key] = original_value
                print(f"恢复环境变量: {key}={original_value}")

# 使用示例:复杂的构建流程
def build_project():
    with temporary_directory(prefix="build_") as temp_dir:
        with change_directory(temp_dir):
            with environment_variables(
                BUILD_MODE="production",
                NODE_ENV="production",
                DEBUG="false"
            ):
                # 执行构建命令
                print("开始构建项目...")
                # os.system("npm run build")
                # os.system("python setup.py build")
                print("构建完成")
        
        # 这里临时目录会自动清理
    print("所有资源已清理")

# build_project()
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class DatabaseManager:
    def __init__(self, connection_string):
        self.engine = create_engine(connection_string)
        self.Session = sessionmaker(bind=self.engine)
    
    @contextmanager
    def transaction(self, auto_commit=True):
        """数据库事务上下文管理器"""
        session = self.Session()
        try:
            yield session
            if auto_commit:
                session.commit()
                print("事务提交成功")
        except Exception as e:
            session.rollback()
            print(f"事务回滚: {e}")
            raise
        finally:
            session.close()
    
    @contextmanager
    def nested_transaction(self):
        """嵌套事务支持"""
        session = self.Session()
        try:
            # 开始嵌套事务
            session.begin_nested()
            yield session
            session.commit()
            print("嵌套事务提交")
        except Exception as e:
            session.rollback()
            print(f"嵌套事务回滚: {e}")
            raise
        finally:
            session.close()

# 使用示例
db = DatabaseManager("sqlite:///example.db")

# 基本事务
with db.transaction() as session:
    user = User(name="张三", email="zhangsan@example.com")
    session.add(user)
    # 如果这里发生异常,事务会自动回滚

# 复杂业务逻辑
with db.transaction() as session:
    # 创建用户
    user = User(name="李四", email="lisi@example.com")
    session.add(user)
    session.flush()  # 获取用户ID
    
    # 创建用户配置
    profile = UserProfile(user_id=user.id, settings={"theme": "dark"})
    session.add(profile)
    
    # 记录创建日志
    log = AuditLog(action="user_created", user_id=user.id)
    session.add(log)
    # 所有操作要么全部成功,要么全部回滚
class DatabaseConnection:
    def __init__(self, db_url):
        self.db_url = db_url
        self.connection = None
        self.transaction_count = 0
    
    def __enter__(self):
        print(f"连接到数据库: {self.db_url}")
        self.connection = self._create_connection()
        self.transaction_count = 0
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            print(f"发生异常: {exc_type.__name__}: {exc_val}")
            self.connection.rollback()
        else:
            self.connection.commit()
        
        print("关闭数据库连接")
        self.connection.close()
        # 返回 True 表示异常已被处理,不再向上传播
        return False  # 让异常正常传播
    
    def _create_connection(self):
        # 模拟创建数据库连接
        class MockConnection:
            def commit(self): print("提交事务")
            def rollback(self): print("回滚事务")
            def close(self): print("关闭连接")
            def execute(self, query): 
                print(f"执行: {query}")
                return f"结果: {query}"
        return MockConnection()
    
    def execute_query(self, query):
        self.transaction_count += 1
        return self.connection.execute(query)

# 使用示例
with DatabaseConnection("postgresql://localhost/mydb") as db:
    result1 = db.execute_query("SELECT * FROM users")
    result2 = db.execute_query("UPDATE products SET price = 100")
    print(f"执行了 {db.transaction_count} 个事务")

Python 装饰器

  • 日志记录: 装饰器可用于记录函数的调用信息、参数和返回值。

  • 性能分析: 可以使用装饰器来测量函数的执行时间。

  • 权限控制: 装饰器可用于限制对某些函数的访问权限。

  • 缓存: 装饰器可用于实现函数结果的缓存,以提高性能。

    • 类装饰器基于我们的 __call__ 实现的吧

  • 内置装饰器

    • @staticmethod: 将方法定义为静态方法,不需要实例化类即可调用。

    • @classmethod: 将方法定义为类方法,第一个参数是类本身(通常命名为 cls)。

    • @property: 将方法转换为属性,使其可以像属性一样访问。

def logger_decorator(func):
    """记录函数参数和返回值的装饰器"""
    def wrapper(*args, **kwargs):
        print(f"调用函数: {func.__name__}")
        print(f"参数: args={args}, kwargs={kwargs}")
        result = func(*args, **kwargs)
        print(f"返回值: {result}")
        return result
    return wrapper

@logger_decorator
def add(a, b):
    return a + b

@logger_decorator
def greet(name, greeting="Hello"):
    return f"{greeting}, {name}!"

# 使用
add(3, 5)
greet("Alice", greeting="Hi")



from functools import wraps

def preserve_metadata_decorator(func):
    """使用wraps保留原函数元信息"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        print(f"执行 {func.__name__}")
        return func(*args, **kwargs)
    return wrapper

@preserve_metadata_decorator
def example_function():
    """这是一个示例函数"""
    pass

print(example_function.__name__)  # example_function
print(example_function.__doc__)   # 这是一个示例函数




class TimerDecorator:
    """计时器类装饰器"""
    def __init__(self, func):
        self.func = func
        wraps(func)(self)
    
    def __call__(self, *args, **kwargs):
        import time
        start_time = time.time()
        result = self.func(*args, **kwargs)
        end_time = time.time()
        print(f"函数 {self.func.__name__} 执行时间: {end_time - start_time:.4f}秒")
        return result

class RetryDecorator:
    """重试机制类装饰器"""
    def __init__(self, max_retries=3, delay=1):
        self.max_retries = max_retries
        self.delay = delay
    
    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            import time
            last_exception = None
            
            for attempt in range(self.max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    print(f"尝试 {attempt + 1} 失败: {e}")
                    if attempt < self.max_retries - 1:
                        print(f"等待 {self.delay} 秒后重试...")
                        time.sleep(self.delay)
            
            print(f"所有 {self.max_retries} 次尝试均失败")
            raise last_exception
        return wrapper

# 使用类装饰器
@TimerDecorator
def slow_function():
    import time
    time.sleep(1)
    return "完成"

@RetryDecorator(max_retries=3, delay=2)
def unreliable_network_call():
    import random
    if random.random() < 0.7:  # 70% 失败率
        raise ConnectionError("网络连接失败")
    return "成功"

# 使用
slow_function()
try:
    unreliable_network_call()
except Exception as e:
    print(f"最终失败: {e}")




from functools import wraps
from flask import request, jsonify, g
import jwt

def login_required(f):
    """登录验证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'error': '缺少认证令牌'}), 401
        
        try:
            # 验证JWT令牌
            payload = jwt.decode(token, 'secret-key', algorithms=['HS256'])
            g.user_id = payload['user_id']
            g.user_role = payload.get('role', 'user')
        except jwt.ExpiredSignatureError:
            return jsonify({'error': '令牌已过期'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'error': '无效令牌'}), 401
        
        return f(*args, **kwargs)
    return decorated_function

def role_required(required_role):
    """角色权限装饰器"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # 假设用户信息已经通过 login_required 设置到 g 对象中
            if not hasattr(g, 'user_role') or g.user_role != required_role:
                return jsonify({'error': '权限不足'}), 403
            return f(*args, **kwargs)
        return decorated_function
    return decorator

def rate_limit(requests_per_minute=60):
    """API限流装饰器"""
    import time
    from collections import defaultdict
    
    request_timestamps = defaultdict(list)
    
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # 获取客户端标识(这里简单使用IP)
            client_ip = request.remote_addr
            current_time = time.time()
            
            # 清理1分钟前的记录
            request_timestamps[client_ip] = [
                ts for ts in request_timestamps[client_ip] 
                if current_time - ts < 60
            ]
            
            # 检查是否超过限制
            if len(request_timestamps[client_ip]) >= requests_per_minute:
                return jsonify({
                    'error': '请求过于频繁', 
                    'retry_after': 60
                }), 429
            
            # 记录本次请求
            request_timestamps[client_ip].append(current_time)
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# 在Flask路由中使用
@app.route('/api/admin/users')
@login_required
@role_required('admin')
@rate_limit(requests_per_minute=30)
def get_all_users():
    # 只有管理员才能访问,且受到限流保护
    users = User.query.all()
    return jsonify([user.to_dict() for user in users])

@app.route('/api/profile')
@login_required
@rate_limit(requests_per_minute=60)
def get_user_profile():
    # 普通用户访问自己的资料,基础限流
    user = User.query.get(g.user_id)
    return jsonify(user.to_dict())



import time
from functools import wraps
import pickle
import hashlib

def cache(ttl=300, maxsize=1000):
    """缓存装饰器,支持TTL和最大缓存大小"""
    cache_store = {}
    cache_keys = []  # 用于LRU淘汰
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存键
            key_data = pickle.dumps((args, sorted(kwargs.items())))
            cache_key = hashlib.md5(key_data).hexdigest()
            
            # 检查缓存
            if cache_key in cache_store:
                cached_data, timestamp = cache_store[cache_key]
                if time.time() - timestamp < ttl:
                    print(f"缓存命中: {func.__name__}")
                    return cached_data
                else:
                    # 缓存过期
                    del cache_store[cache_key]
                    cache_keys.remove(cache_key)
            
            # 执行函数
            result = func(*args, **kwargs)
            
            # 更新缓存
            if len(cache_store) >= maxsize:
                # LRU淘汰
                oldest_key = cache_keys.pop(0)
                del cache_store[oldest_key]
            
            cache_store[cache_key] = (result, time.time())
            cache_keys.append(cache_key)
            
            return result
        return wrapper
    return decorator

def async_timer(func):
    """异步函数计时装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            return result
        finally:
            end_time = time.time()
            print(f"异步函数 {func.__name__} 执行时间: {end_time - start_time:.4f}秒")
    return wrapper

def database_query_logger(func):
    """数据库查询日志装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            execution_time = time.time() - start_time
            
            # 记录慢查询
            if execution_time > 1.0:  # 1秒以上认为是慢查询
                print(f"🚨 慢查询警告: {func.__name__} 耗时 {execution_time:.3f}秒")
            
            return result
        except Exception as e:
            print(f"❌ 查询错误: {func.__name__} - {e}")
            raise
    return wrapper

# 使用示例
@cache(ttl=600)  # 缓存10分钟
@database_query_logger
def get_user_by_id(user_id):
    # 模拟数据库查询
    time.sleep(0.5)
    return {"id": user_id, "name": f"User {user_id}"}

@async_timer
async def fetch_user_data_async(user_id):
    import asyncio
    await asyncio.sleep(0.3)
    return {"id": user_id, "data": "some_data"}




from functools import wraps
import jsonschema
from marshmallow import Schema, fields, ValidationError

def validate_json_schema(schema):
    """JSON Schema验证装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 假设第一个参数是请求数据
            if args and isinstance(args[0], dict):
                data = args[0]
                try:
                    jsonschema.validate(instance=data, schema=schema)
                except jsonschema.ValidationError as e:
                    raise ValueError(f"数据验证失败: {e.message}")
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

def validate_with_marshmallow(schema_class):
    """使用Marshmallow进行数据验证"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if args and isinstance(args[0], dict):
                data = args[0]
                schema = schema_class()
                try:
                    validated_data = schema.load(data)
                    # 用验证后的数据替换原始数据
                    new_args = (validated_data,) + args[1:]
                    return func(*new_args, **kwargs)
                except ValidationError as e:
                    raise ValueError(f"数据验证失败: {e.messages}")
            return func(*args, **kwargs)
        return wrapper
    return decorator

def transform_response(serializer_class):
    """响应数据序列化装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            result = func(*args, **kwargs)
            
            # 序列化结果
            if result is not None:
                serializer = serializer_class()
                if isinstance(result, list):
                    return [serializer.dump(item) for item in result]
                else:
                    return serializer.dump(result)
            return result
        return wrapper
    return decorator

# 定义数据模式
class UserSchema(Schema):
    id = fields.Int(required=True)
    name = fields.Str(required=True)
    email = fields.Email(required=True)

class UserCreateSchema(Schema):
    name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
    email = fields.Email(required=True)
    password = fields.Str(required=True, validate=lambda x: len(x) >= 6)

# 使用示例
user_create_schema = {
    "type": "object",
    "properties": {
        "name": {"type": "string", "minLength": 2},
        "email": {"type": "string", "format": "email"},
        "password": {"type": "string", "minLength": 6}
    },
    "required": ["name", "email", "password"]
}

@validate_json_schema(user_create_schema)
# 或者使用 @validate_with_marshmallow(UserCreateSchema)
def create_user(user_data):
    # 这里user_data已经通过验证
    user = User(**user_data)
    user.save()
    return user

@transform_response(UserSchema)
def get_user(user_id):
    user = User.query.get(user_id)
    return user




import psutil
import threading
from functools import wraps
from contextlib import contextmanager

def circuit_breaker(failure_threshold=5, recovery_timeout=60):
    """断路器装饰器,防止级联故障"""
    state = {
        'failures': 0,
        'last_failure_time': None,
        'state': 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    }
    lock = threading.Lock()
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            with lock:
                current_time = time.time()
                
                # 检查断路器状态
                if state['state'] == 'OPEN':
                    if (current_time - state['last_failure_time']) > recovery_timeout:
                        state['state'] = 'HALF_OPEN'
                    else:
                        raise CircuitBreakerError("断路器打开,服务暂时不可用")
                
            try:
                result = func(*args, **kwargs)
                
                # 成功调用,重置断路器
                with lock:
                    if state['state'] == 'HALF_OPEN':
                        state['state'] = 'CLOSED'
                    state['failures'] = 0
                
                return result
                
            except Exception as e:
                with lock:
                    state['failures'] += 1
                    state['last_failure_time'] = current_time
                    
                    if state['failures'] >= failure_threshold:
                        state['state'] = 'OPEN'
                
                raise CircuitBreakerError(f"调用失败,断路器状态: {state['state']}") from e
        return wrapper
    return decorator

def resource_monitor(cpu_threshold=80, memory_threshold=80):
    """资源监控装饰器,在资源紧张时降级"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 检查系统资源
            cpu_percent = psutil.cpu_percent(interval=1)
            memory_percent = psutil.virtual_memory().percent
            
            if cpu_percent > cpu_threshold or memory_percent > memory_threshold:
                print(f"⚠️ 资源紧张: CPU {cpu_percent}%, 内存 {memory_percent}%")
                # 可以在这里实现降级逻辑
                if hasattr(func, 'fallback'):
                    return func.fallback(*args, **kwargs)
                else:
                    raise ResourceWarning("系统资源紧张,服务降级")
            
            return func(*args, **kwargs)
        
        # 添加降级方法
        def fallback(*args, **kwargs):
            return {"status": "degraded", "message": "服务降级中"}
        
        wrapper.fallback = fallback
        return wrapper
    return decorator

def distributed_lock(lock_key_func, timeout=30):
    """分布式锁装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            lock_key = lock_key_func(*args, **kwargs)
            
            # 这里使用Redis作为分布式锁(需要redis-py)
            import redis
            redis_client = redis.Redis()
            
            lock_acquired = False
            try:
                # 尝试获取锁
                lock_acquired = redis_client.set(
                    lock_key, 'locked', nx=True, ex=timeout
                )
                
                if not lock_acquired:
                    raise DistributedLockError(f"无法获取锁: {lock_key}")
                
                return func(*args, **kwargs)
                
            finally:
                if lock_acquired:
                    redis_client.delete(lock_key)
        return wrapper
    return decorator

# 自定义异常
class CircuitBreakerError(Exception):
    pass

class DistributedLockError(Exception):
    pass

# 使用示例
@circuit_breaker(failure_threshold=3, recovery_timeout=30)
@resource_monitor(cpu_threshold=85, memory_threshold=90)
def process_payment(user_id, amount):
    # 模拟支付处理
    if amount > 1000:
        raise ValueError("金额过大")
    return {"status": "success", "transaction_id": "12345"}

# 为支付处理添加降级方法
def payment_fallback(user_id, amount):
    return {"status": "pending", "message": "支付处理延迟"}

process_payment.fallback = payment_fallback

# 分布式锁使用
def order_lock_key(user_id, *args, **kwargs):
    return f"lock:order:{user_id}"

@distributed_lock(lock_key_func=order_lock_key, timeout=60)
def create_order(user_id, items):
    # 创建订单,确保同一用户不会并发创建多个订单
    return {"order_id": "ORDER123", "status": "created"}

python 标准库总结

  • os 模块:os 模块提供了许多与操作系统交互的函数,例如创建、移动和删除文件和目录,以及访问环境变量等。

  • sys 模块:sys 模块提供了与 Python 解释器和系统相关的功能,例如解释器的版本和路径,以及与 stdin、stdout 和 stderr 相关的信息。

  • time 模块:time 模块提供了处理时间的函数,例如获取当前时间、格式化日期和时间、计时等。

  • datetime 模块:datetime 模块提供了更高级的日期和时间处理函数,例如处理时区、计算时间差、计算日期差等。

  • random 模块:random 模块提供了生成随机数的函数,例如生成随机整数、浮点数、序列等。

  • math 模块:math 模块提供了数学函数,例如三角函数、对数函数、指数函数、常数等。

  • re 模块:re 模块提供了正则表达式处理函数,可以用于文本搜索、替换、分割等。

  • json 模块:json 模块提供了 JSON 编码和解码函数,可以将 Python 对象转换为 JSON 格式,并从 JSON 格式中解析出 Python 对象。

  • urllib 模块:urllib 模块提供了访问网页和处理 URL 的功能,包括下载文件、发送 POST 请求、处理 cookies 等

Python json 模块