Python多脚本的集成化管理(微服务,Nginx)
Published in:2023-12-29 |
Words: 1.2k | Reading time: 6min | reading:

基本思想

通过微服务、或nginx通过某种方式集成管理各个分散python脚本

技术路线

Nginx代理转发

通过配置脚本微服务web功能,通过nginx转发API。在此之前需要手动编写脚本web功能和API接口。

nginx 配置文件编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
server {
#对外监听端口
listen 9001;
#主机名称
server_name localhost;
#规则:当请求路径包含‘eduservice’的时候转发到“http://loclahost:8001”
# ~ 代表正则匹配,不加~ 则为完全匹配才能执行
#请求路径
location ~ /eduservice/ {
#转发地址
proxy_pass http://localhost:8001;
}
#规则:当请求路径包含‘eduoss’的时候转发到“http://loclahost:8002”
location ~ /eduoss/ {
proxy_pass http://localhost:8002;
}
}

原py文件改造

主py文件编写

  • main.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    # from flask import Flask
    import logging
    import sys

    from nameko.standalone.rpc import ClusterRpcProxy

    import uvicorn
    from fastapi import FastAPI
    from loguru import logger
    from app.http_server.log.log_base import LOG_DIR
    from app.http_server.log.log_config import InterceptHandler, format_record
    from app.http_server.router.router import api_router
    from app.run.constant import app_port, log_man_input_path, log_man_out_path, publish_date, build_date, py_server_version


    def init_app():
    """
    APP init : log router ...
    :return:
    """
    # fast api app 启动项配置与启动
    app = FastAPI(title="CTS Log Reporter", version="0.0.1",
    description="Calmcar Tools System: Log analysis and reporter platform", debug=True)
    # 路由引入
    app.include_router(router=api_router, prefix="/api/v1")
    # 日志配置与捕获
    logging.getLogger().handlers = [InterceptHandler()]
    logger.configure(
    handlers=[{"sink": sys.stdout, "level": logging.DEBUG, "format": format_record}])
    logger.add(LOG_DIR, encoding='utf-8', rotation="12:00")
    logger.debug('日志系统已加载... 当前日志所在目录:' + LOG_DIR)
    logging.getLogger("uvicorn.access").handlers = [InterceptHandler()]
    return app


    app = init_app()
    # MQ配置


    if __name__ == '__main__':
    uvicorn.run(app='main:app', host='0.0.0.0', port=app_port, reload=True, workers=4)

API路由定义

  • router.py
    1
    2
    3
    4
    5
    6
    from fastapi import APIRouter
    from app.http_server.view import log_process

    api_router = APIRouter()
    # 模块路由配置
    api_router.include_router(log_process.router, prefix="/shell", tags=['脚本操作'])

API 接口定义

  • api_manage.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from fastapi import APIRouter
    from nameko.standalone.rpc import ClusterRpcProxy

    config_mq = {'AMQP_URI': "amqp://guest:guest@192.168.163.129:5672"}


    router = APIRouter()


    @router.get('/hello_world')
    def call_service():
    with ClusterRpcProxy(config_mq) as rpc:
    # 消费者调用微服务(生产者),获取服务(生产者)的返回值
    result = rpc.generate_service.hello_world(msg="xag msg")

    # 返回结果
    return result, 200

微服务网关转发

如图:代理网关转发来自web前端请求,并根据api转发对应微服务脚本。Gateway与微服务之间采用gRPC,微服务之间采用http(REST API)通信。

img

nameko

img

Nameko框架采用生产者-消费者模式。

搭建nameko框架

1
pip install nameko

构建生产者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# producer_service.py

from nameko.rpc import rpc

class GenerateService(object):
# 定义微服务名称 此处调用指定脚本
name = "generate_service"

@rpc
def hello_world(self, msg):
print('hello,i am been called by customer(消费者),返回消息:{}'.format(msg))

# 返回结果
return "Hello World!I Am a msg from producer!"

通过rabbitmq注册微服务

1
2
3
4
5
6
# 注册服务
# producer_service:目标文件
# admin:admin:MQ用户名及密码
# ip地址:5672:MQ服务器ip地址及应用端口号
# my_vhost:虚拟机名
nameko run producer_service --broker amqp://admin:admin@ip地址:5672/my_vhost

消费者调用脚本微服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

config_mq = {'AMQP_URI': "amqp://guest:guest@192.168.163.129:5672"}


router = APIRouter()


@router.get('/hello_world')
def call_service():
with ClusterRpcProxy(config_mq) as rpc:
# 消费者调用微服务(生产者),获取服务(生产者)的返回值
result = rpc.generate_service.hello_world(msg="xag msg")

# 返回结果
return result, 200

生产者服务启动方式

  • 命令行启动

    1
    nameko run generate_service --broker amqp://guest:guest@192.168.163.129:5672/
  • 配置文件启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # config.yml
    AMQP_URI: 'pyamqp://guest:guest@localhost'
    WEB_SERVER_ADDRESS: '0.0.0.0:8000'
    rpc_exchange: 'nameko-rpc'
    max_workers: 10
    parent_calls_tracked: 10

    LOGGING:
    version: 1
    handlers:
    console:
    class: logging.StreamHandler
    root:
    level: DEBUG
    handlers: [console]
    # 启动命令行:
    # nameko run --config config.yml service1
  • 结果示例:

    调用者
    img
    被调用者
    img

原web服务改造

将API迁移至消费者中,仅保留具体执行方法;安装nameko,编写生产者通过rpc调用的方法。

rabbitmq服务构建

构建docker容器

1
2
docker pull rabbitmq
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

web页面管理

1
2
3
docker ps 
docker exec -it contain ID /bin/bash
rabbitmq-plugins enable rabbitmq_management

Bali

Bali is a framework integrate FastAPI and gRPC. If you want to provide both HTTP and RPC, it can improve development efficiency.
业务代码分布为在两个层次:Model 层和 Resource 层。Model 层是一个 Fat Model 模型,除了字段定义,还有与 Model 相关的一些逻辑;Resource 层提供同时兼容 HTTP/RPC 的资源服务。遵循 RESTful 及 gRPC 的开发指南定义了几个标准方法。

install

1
2
3
4
5
6
# Bali framework
pip install bali-core

# Bali command line tool
pip install bali-cli

application create

1
2
3
4
5
6
7

import greeter_server

# Initialized App
app = Bali()
# Updated settings
app.settings(base_settings={'title': 'Bali App'})

app start

1
2
3
4
5
6
7
8
9
# With bali-cli 
bali run http
bali run rpc
bali run event

python main.py run --http # launch HTTP in development mode
python main.py run --rpc # launch RPC
python main.py run --event # launch Event

test demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
创建gRPC 测试代码
from bali.tests import GRPCTestBase
from service.demo import demo_service, demo_pb2, demo_pb2_grpc


class TestDemoRPC(GRPCTestBase):
server_class = demo_service.DemoService # Provided service

pb2 = demo_pb2 # Provided pb2
pb2_grpc = demo_pb2_grpc # Provided pb2 grpc

def setup_method(self): # Pytest setup
pass

def teardown_method(self): # Pytest teardown
pass

def test_demo(self):
pass

See

Prev:
python使用selenium框架动态抓取指定内容
Next:
SMB共享服务python脚本操作文件上传下载