首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,127 阅读
2
类的加载
761 阅读
3
Spring Cloud OAuth2.0
740 阅读
4
SpringBoot自动装配原理
701 阅读
5
集合不安全问题
601 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
FastApi
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
393
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
FastApi
页面
统计
关于
搜索到
393
篇与
的结果
2025-06-29
Helm
参照 b站王晓春老师 helm 课程一、安装官网:https://helm.sh/此处以二进制包安装为例:# K8s 管理节点 wget https://github.com/helm/helm/releases/download/v3.18.3/helm-v3.18.3-linux-amd64.tar.gz.asc tar -zxvf helm-v3.18.3-linux-amd64.tar.gz.asc -C /usr/local/bin ln -s /usr/local/bin/linux-amd64/helm /usr/bin/helm # 查看版本信息 helm version二、常用命令1. 管理类Repository 管理add、list、remove、upgrade、index子命令等Chart 管理create、package、pull、push、dependency、search、show、verify等Release 管理instll、upgrade、get、list、history、status、rollback、uninstall等2. 常用命令示例# repo helm repo list # 列出已添加的仓库 helm repo add [REPO_NAME] [URL] # 添加远程仓库并指定名称 helm repo remove [ROPE_NAME] # 删除仓库 helm repo update # 更新仓库,类似于 apt update helm search hub [xxx] # 从 artifacthub 搜索,类似于 docker search helm search repo [xxx] # 从本地仓库搜索 helm show chart [CHART] # 查看 chart 包的信息 helm show values [CHART] # 查看 chart 包下 values.yaml 的文件内容 # 拉取 helm pull repo/chartname # 拉取最新版本的 chart.tgz 到当前目录下 helm pull chart_URL # 从 url 拉取 helm pull repo/app --version 1.0 --untar # 拉取指定版本并解压 # 安装 helm install [NAME] [CHART] [--version <string>] # 安装指定版本的 chart helm install [CHART] --generate-name # 安装时自动生成 RELEASE_NAME helm install --set key1=value1,key2=value2 [RELEASE_NAME] [CHART] # 动态设置参数 helm install -f values.yaml [RELEASE_NAME] [CHART] # 引用指定文件配置 helm install --debug --dry-run [RELEASE_NAME] [CHART] # 调试但不实际运行,输出调试的结果 # 删除 helm uninstall [RELEASE_NAME] # 删除 release # 查看 helm list # 列出已安装的 release helm status [RELEASE_NAME] # 查看 release 状态 helm get notes [RELEASE_NAME] # 查看 release 的说明 helm get values [RELEASE_NAME] -n [NAMESPACE] > values.yaml # 查看 release 生成的 values 并导出至指定文件 helm get mainfest [RELEASE_NAME] -n [NAMESPACE] # 查看 release 生成的资源清单文件 # 升级和回滚 helm upgrade [RELEASE_NAME] [CHART] --set key=value # 更新 release,同时动态设置参数 helm upgrade [RELEASE_NAME] [CHART] -f chartname/values.yaml # 同上 helm rollback [RELEASE_NAME] [REVISION] # 回滚至指定版本,不指定版本时默认回滚至上一个版本 helm history [RELEASE_NAME] # 查看历史记录 # 打包 helm package chartname/ # 将指定目录下的 chart 打包为 chartname.tgz 包至当前目录下3. 安装 chart 的六种方式By chart reference:helm install mysql xxx/mysqlBy path to a packaged chart:helm install mysql ./mysql-5.7.35.tgzBy path to an unpacked chart directory:helm install mysql ./mysqlBy absolute URL:helm install mysql https://xxx.com/charts /mysql-5.7.35.tgzBy chart reference and repo url:helm install --repo https://xxx .com/charts/mysql mysqlBy oci registries:helm install mysql --version 5.7.35 oci://xxx.com/charts /mysql
2025年06月29日
3 阅读
0 评论
0 点赞
2025-06-19
中间件
四、中间件1. 概念"中间件"是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应之后工作。2. 使用在函数的顶部使用装饰器 @app.middleware("http")中间件接收的参数:requestcall_next 函数接收 request,作为参数在处理完相应的逻辑后,将 request 传递给下一步操作返回对应路径请求的 response3. 示例from fastapi import FastAPI, Request, Response app = FastAPI() @app.middleware("http") async def middleware2(request: Request, call_next): print("中间件2-请求") response = await call_next(request) print("中间件2-响应") return response @app.middleware("http") async def middleware1(request: Request, call_next): print("中间件1-请求") response = await call_next(request) print("中间件1-响应") return response @app.get("/{user_id}") async def get_user(user_id: int): print(f'查询用户id为: {user_id}的用户') return { "user": "孙笑川" } if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 当请求 get_user 时,控制台会打印如下信息(中间件需按顺序写,从上至下倒序):4. CORS 中间件示例from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware app = FastAPI() origins = [ "http://localhost:8089" ] app.add_middleware( CORSMiddleware, allow_origins=origins, # 值为*时代表所有客户端 allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["*"], ) @app.get("/") def main(): return { "message": "你好,孙笑川" } if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True)
2025年06月19日
7 阅读
0 评论
0 点赞
2025-06-16
请求与响应
1. 路径参数(1)基本使用@app.get("/user/{user_id}") async def get_user(user_id): print(user_id, type(user_id)) return { "user_id": user_id }(2)声明路径参数类型此例中参数类型为 int@app.get("/user/{user_id}") async def get_user(user_id: int): print(user_id, type(user_id)) return { "user_id": user_id }(3)注意定义的函数顺序问题""" 注意定义的函数顺序问题 例:有请求路径为 /user/list 的方法,它需要置于 /user/{user/id} 之前 否则会出现请求 /user/list 方法,但请求最终落在了 /user/{user_id} 方法上的问题 """ @app.get("/user/list") async def get_user_list(): return tuple([ { "user": "user1" }, { "user": "user2" } ]) @app.get("/user/{user_id}") async def get_user(user_id: int): print(user_id, type(user_id)) return { "user_id": user_id }2. 查询参数(请求参数)路径函数中声明不属于路径参数的其他函数参数时,它们将被自动解释为查询字符串参数,就是 url? 之后用&分割的 key-value 键值对。from typing import Union from fastapi import FastAPI app = FastAPI() """ 此例中有路径参数 user_id,查询参数 dept_id 参数 dept_id 通过 Union 声明为 str、int 类型,或者为None(不传) """ @app.get("/user/{user_id}") async def get_user(user_id: int, dept_id: Union[str, int] = None): print(f'user_id={user_id}, dept_id={dept_id}') return { "user_id": user_id, "dept_id": dept_id } if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 补充:自 Python 3.5 开始,PEP484 为 Python 引入了类型注解(type hints)类型检查,防止运行时出现参数、返回值类型不符作为开发文档附加说明,方便使用者调用时传入和返回参数类型模块加入不会影响程序的运行不会报正式的错误,pycharm 支持 typing 检查错误时会出现黄色警告Union 和 Optional,Optional[str] 是 Union[str] = None 的语法糖3. 请求体数据FastApi 请求体数据基于 Pydantic,Pydantic 主要用于做类型强制检查(校验数据)。(1)安装pip install pydantic(2)使用from typing import Union, Optional from fastapi import FastAPI from pydantic import BaseModel, Field, validator app = FastAPI() class User(BaseModel): id: int name: str age: int = Field(default=0, gt=0, lt=100) no: int address: Optional[str] @validator('name') def name_validator(cls, val): assert len(val) >= 2, '用户名不能少于2个字' return val @app.post("/user") async def add(user: User): print(f'新增用户: {user}') return user if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 补充:FastApi 会自动将定义的模型类转化为 JSON Schemaswagger 文档参数校验当接口请求数据为{ "id": 1, "name": "测", "age": 0, "no": 10, "address": "成都" }服务端参数校验返回4. form 表单数据在 FastAPI 中可以使用 Form 组件来接收表单数据(1)安装pip install python-multipart(2)使用from fastapi import FastAPI, Form app = FastAPI() @app.post("/user") async def add(id: str = Form(..., regex='[a-zA-Z]'), name: str = Form(..., min_length=2, max_length=6)): print(f'新增用户: id={id}, name={name}') return { "id": id, "name": name } if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 5. 文件上传from typing import List from fastapi import FastAPI, File, UploadFile app = FastAPI() @app.post("/upload-file") async def upload_file(file: bytes = File()): print(f'file_len={len(file)}') return "上传成功" @app.post("/upload-files") async def upload_files(files: List[bytes] = File()): print(f'files_len={len(files)}') return "上传成功" # 适合大文件上传 @app.post("/upload-multi-files") async def upload_multi_files(files: List[UploadFile] = File()): print(f'files_len={len(files)}') return "上传成功" if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 6. Request 对象在函数中声明 Request 类型的参数,FastAPI 会自动传递 Request 对象给这个参数,可以获取其中的 header, url, cookie, session 等信息from fastapi import FastAPI, Request app = FastAPI() @app.get("/request_info") async def request_info(request: Request): request_info_dict = { "method": request.method, "url": request.url, "params": request.query_params, "headers": request.headers, "cookies": request.cookies, "ip": request.client.host, "user-agent": request.headers.get("User-Agent") } print(f'request_info_dict={request_info_dict}') return request_info_dict if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 7. 请求静态文件web开发中,常涉及静态文件等资源,如css、js、图片、icon等,可在项目中新建 static 文件夹,将静态资源放入该文件夹后做如下配置from fastapi import FastAPI from fastapi.staticfiles import StaticFiles app = FastAPI() app.mount("/static", StaticFiles(directory="static")) if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 8. 响应参数(1)response_modelFastAPI 提供了 response_model 参数,声明 return 响应体的模型将输出数据转换为 response_model 中声明的数据类型验证数据结构和类型将输出数据限制为该 model 定义的添加到 OpenAPI 中在自动文档系统中使用以新增用户为例from typing import Optional from fastapi import FastAPI from pydantic import BaseModel, Field, validator app = FastAPI() class User(BaseModel): id: int name: str age: int = Field(default=0, gt=0, lt=100) no: int address: Optional[str] @validator('name') def name_validator(cls, val): assert len(val) >= 2, '用户名不能少于2个字' return val class UserVo(BaseModel): id: int name: str age: int = Field(default=0, gt=0, lt=100) no: int @app.post("/user", response_model=UserVo) async def add(user: User): print(f'新增用户: {user}') return user if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) (2)response_model_exclude_unsetfrom typing import Optional from fastapi import FastAPI from pydantic import BaseModel, Field, validator app = FastAPI() class User(BaseModel): id: int name: str age: int = Field(default=0, gt=0, lt=100) no: int address: Optional[str] = None @validator('name') def name_validator(cls, val): assert len(val) >= 2, '用户名不能少于2个字' return val @app.post("/response_model_exclude_unset_user", response_model=User, response_model_exclude_unset=True) async def response_model_exclude_unset_add(user: User): print(f'新增用户: {user}') return user if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) (3)response_model_exclude_defaults不返回为默认值的字段(4)response_model_exclude_none不返回为 None 的字段(5)INCLUDE和EXCLUDEINCLUDE:只返回 INCLUDE 声明了的字段EXCLUDE:返回 EXCLUDE 声明了的之外的字段(即排除指定字段)
2025年06月16日
5 阅读
0 评论
0 点赞
2025-06-15
路径操作
1. 路径操作装饰器(1)FastApi支持的请求方式@app.get() @app.post() @app.put() @app.patch() @app.delete() @app.options() @app.head() @app.trace()(2)路径操作装饰器参数@app.post( "/user/{user_id}", response_model=User, status_code=status.HTTP_200_OK, tags=["用户模块"], summary="根据user_id查询用户信息", description="根据user_id查询用户信息", response_description= "用户信息", deprecated=False, )2. include_router项目结构__init.py__# __init__.py from .user import user from .dept import dept dept.pyfrom fastapi import APIRouter dept = APIRouter() @dept.get("/{dept_id}") async def get_dept(dept_id): return { "dept": dept_id } user.pyfrom fastapi import APIRouter user = APIRouter() @user.get("/{user_id}") async def get_user(user_id): return { "user": user_id } main.pyfrom fastapi import FastAPI from apps import user, dept app = FastAPI() app.include_router(user, prefix="/user", tags=["用户模块", ]) app.include_router(dept, prefix="/dept", tags=["部门模块", ]) if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True) 启动项目后访问docs
2025年06月15日
9 阅读
0 评论
0 点赞
2025-06-13
快速开始
参考 B站苑昊老师 FastApi 课程一、QuickStart1. 安装# fastapi pip install fastapi # uvicorn pip install uvicorn2. 测试 demo(1)main.pyfrom fastapi import FastAPI app = FastAPI() @app.get("/") async def index(): return "Hello World!"(2)启动# 通过命令行 uvicorn main:app --reload# 通过 main 函数 if __name__ == '__main__': import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=8088, reload=True)(3)接口文档启动后通过 http://127.0.0.1:8088/docs 即可访问 swagger
2025年06月13日
11 阅读
0 评论
0 点赞
2025-03-18
面向对象
1. 类# 类 class Person(object): # object 基类 pass # 创建对象(类的实例化) student = Person() print(type(student)) # <class '__main__.Person'> print(isinstance(student, object)) print(isinstance(student, Person)) 2. 实例属性# 实例属性 class Person(object): def __init__(self, name, age, address): self.name = name self.age = age self.address = address student = Person('孙笑川', 33, '成都') student.job = '主播' print(student.__dict__) # 获取实例的所有属性 3. 类属性# 类属性 class Person(object): index = 0 # 类属性 def __init__(self, name, age, address): self.name = name self.age = age self.address = address Person.index += 1 if age < 0 or age > 100: raise Exception('年龄有误,请检查!') try: student = Person('孙笑川', 33, '成都') print(student.__dict__) print('第%d个用户' % Person.index) student1 = Person('刘波', 30, '湖北') print(student1.__dict__) print('第%d个用户' % Person.index) except Exception as e: print(e) 4. 实例方法# 实例方法 class Person(object): index = 0 # 类属性 def __init__(self, name, age, address): self.name = name self.age = age self.address = address Person.index += 1 # print('第%d个用户' % Person.index) # 实例方法 def say_hello(self): print('大家好,我是%s' % self.name) try: student = Person('孙笑川', 33, '成都') # print(student.__dict__) student.say_hello() student1 = Person('刘波', 30, '湖北') # print(student1.__dict__) except Exception as e: print(e) 5. 类方法# 类方法 class Person(object): index = 0 # 类属性 def __init__(self, name, age, address): self.name = name self.age = age self.address = address Person.index += 1 if age < 0 or age > 100: raise Exception('年龄有误,请检查!') # 实例方法 def say_hello(self): print('大家好,我是%s' % self.name) # 类方法 @classmethod def get_user_no(cls): print('第%d个用户' % cls.index) try: student = Person('孙笑川', 33, '成都') # print(student.__dict__) student.say_hello() student1 = Person('刘波', 30, '湖北') # print(student1.__dict__) Person.get_user_no() except Exception as e: print(e) 6. 静态方法# 静态方法 class Person(object): index = 0 # 类属性 def __init__(self, name, age, address): self.name = name self.age = age self.address = address Person.index += 1 if age < 0 or age > 100: raise Exception('年龄有误,请检查!') # 实例方法 def say_hello(self): print('大家好,我是%s' % self.name) # 类方法 @classmethod def get_user_no(cls): print('第%d个用户' % cls.index) # 静态方法 @staticmethod def is_valid(**kwargs): if kwargs['token'] == '123456': return True else: print('前置校验失败') return False try: check_key = {'token': '123456'} if Person.is_valid(**check_key): student = Person('孙笑川', 33, '成都') # print(student.__dict__) student.say_hello() student1 = Person('刘波', 30, '湖北') # print(student1.__dict__) Person.get_user_no() else: print('创建用户失败') except Exception as e: print(e) 7. 继承# 继承 class Person(object): index = 0 # 类属性 def __init__(self, name, age, address): self.name = name self.age = age self.address = address Person.index += 1 if age < 0 or age > 100: raise Exception('年龄有误,请检查!') # 实例方法 def say_hello(self): print('大家好,我是%s' % self.name) # 类方法 @classmethod def get_user_no(cls): print('第%d个用户' % cls.index) # 静态方法 @staticmethod def is_valid(**kwargs): if kwargs['token'] == '123456': return True else: print('前置校验失败') return False class People(Person): # 重写父类的 say_hello 方法 def say_hello(self): print('大家好,我是%s,我来自%s' % (self.name, self.address)) try: check_key = {'token': '123456'} if Person.is_valid(**check_key): student = Person('孙笑川', 33, '成都') # print(student.__dict__) student.say_hello() student1 = People('刘波', 30, '湖北') # print(student1.__dict__) student1.say_hello() else: print('创建用户失败') except Exception as e: print(e) 8. 多态# 多态 class Animal(object): def say(self): print('Animal Say...') class Dog(Animal): def say(self): print('汪汪汪...') class Cat(Animal): def say(self): print('喵喵喵...') animal = Animal() animal.say() dog = Dog() dog.say() cat = Cat() cat.say() 9. 封装# 封装 class Person(object): def __init__(self, name, age, address): # self._name = name # 受保护的变量 self.__name = name # 私有变量 self.__age = age self.__address = address if not isinstance(age, int) or age < 0 or age > 100: raise Exception('年龄有误,请检查!') """ 把函数当作变量使用 @property def 变量名(self): # 获取变量 get @变量名.setter def 变量名(self, 变量名): # 设置变量 set """ @property def name(self): return self.__name @name.setter def name(self, name): self.__name = name @property def age(self): return self.__age @age.setter def age(self, age): if not isinstance(age, int) or age < 0 or age > 100: raise Exception('年龄有误,请检查!') else: self.__age = age @property def address(self): return self.__address @address.setter def address(self, address): self.__address = address try: student = Person('孙笑川', 33, '成都') print(student.name) # 修改属性 student.name = '刘波' print(student.__dict__) except Exception as e: print(e)
2025年03月18日
52 阅读
0 评论
0 点赞
2025-03-17
文件及IO操作
1. 读取文件# 读取文件 import os # 绝对路径 # absolute_path = os.getcwd() # file_name = absolute_path + '/test.txt' # f = open(file_name, mode='r', encoding='utf-8') # 相对路径 f = open('test.txt', mode='r', encoding='utf-8') # content = f.read() # 读取全部 # content = f.readline() # 读取一行 content = f.readlines() # 按行读取 print(content) # 关闭 f.close() 2. 写文件# 写文件 f = open('test.txt', mode='w', encoding='utf-8') f.write('大家好\n') write_content = ['孙笑川\n', '刘波\n'] f.writelines(write_content) f.close() f = open('test.txt', mode='r', encoding='utf-8') read_content = f.read() print(read_content) f.close() 3. 追加写# 追加写 f = open('test.txt', mode='a', encoding='utf-8') f.write('你好\n') content = ['带带大师兄\n', '药水哥\n'] f.writelines(content) f.close() 4. with# with open() as f 操作文件 with open('test.txt', mode='r', encoding='utf-8') as f: print(f.read()) print('使用with语法糖读取文件内容完成')
2025年03月17日
18 阅读
0 评论
0 点赞
2025-03-16
模块
此处以 random、正则、socket 为例1. random# random import random # 随机小数 print(random.random()) # 1 - 10 之间的随机数 print(random.randint(1, 10)) 2. 正则# 正则 import re # \d 数字 result = re.match(r'\d+', '123abc') print(result) # 123 # \w 数字、字母、下划线 result = re.match(r'\w+', '1a2b3c!') print(result) # 1a2b3c # \s 空白字符串;\S 非空字符串 result = re.match(r'^\s+$', '1 2 3') print(result) # None result = re.match(r'^\S+$', '1 abc') print(result) # None # . 任意字符 result = re.match(r'.+$', 'abc 1q2') print(result) # abc 1q2 # [] 区间 result = re.match(r'^a[1a]', 'a123456') print(result) # a1 # | 或 result = re.match(r'^a|1|c$', 'a12345c') print(result) # a 3. socket3.1 server# socket import socket sk = socket.socket() sk.bind(('127.0.0.1', 8088)) sk.listen(5) conn, addr = sk.accept() print(conn) print(addr) while True: accept_data = conn.recv(1024) print('收到客户端发送的消息:', accept_data.decode('utf-8')) send_data = input('请输入要回复的消息:') conn.send(send_data.encode('utf-8'))3.2 client# socket import socket sk = socket.socket() sk.connect(('127.0.0.1', 8088)) while True: sand_data = input('请输入要发送的消息:') sk.send(sand_data.encode('utf-8')) accept_data = sk.recv(1024) print(accept_data.decode('utf-8'))
2025年03月16日
25 阅读
0 评论
0 点赞
1
2
...
50