File size: 4,908 Bytes
3079197
9bf75d4
3079197
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9bf75d4
3079197
9bf75d4
 
3079197
 
 
 
 
 
 
 
 
 
 
6be3dd5
3079197
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6be3dd5
3079197
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6be3dd5
 
3079197
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#
#  Copyright 2021 The InfiniFlow Authors. All Rights Reserved.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
import abc
import json
import time
from functools import wraps
from shortuuid import ShortUUID

from api.versions import get_rag_version

from api.errors.error_services import *
from api.settings import (
    GRPC_PORT, HOST, HTTP_PORT,
    RANDOM_INSTANCE_ID,  stat_logger,
)


instance_id = ShortUUID().random(length=8) if RANDOM_INSTANCE_ID else f'flow-{HOST}-{HTTP_PORT}'
server_instance = (
    f'{HOST}:{GRPC_PORT}',
    json.dumps({
        'instance_id': instance_id,
        'timestamp': round(time.time() * 1000),
        'version': get_rag_version() or '',
        'host': HOST,
        'grpc_port': GRPC_PORT,
        'http_port': HTTP_PORT,
    }),
)


def check_service_supported(method):
    """Decorator to check if `service_name` is supported.

    The attribute `supported_services` MUST be defined in class.

    The first and second arguments of `method` MUST be `self` and `service_name`.



    :param Callable method: The class method.

    :return: The inner wrapper function.

    :rtype: Callable

    """
    @wraps(method)
    def magic(self, service_name, *args, **kwargs):
        if service_name not in self.supported_services:
            raise ServiceNotSupported(service_name=service_name)
        return method(self, service_name, *args, **kwargs)
    return magic


class ServicesDB(abc.ABC):
    """Database for storage service urls.

    Abstract base class for the real backends.



    """
    @property
    @abc.abstractmethod
    def supported_services(self):
        """The names of supported services.

        The returned list SHOULD contain `ragflow` (model download) and `servings` (RAG-Serving).



        :return: The service names.

        :rtype: list

        """
        pass

    @abc.abstractmethod
    def _get_serving(self):
        pass

    def get_serving(self):

        try:
            return self._get_serving()
        except ServicesError as e:
            stat_logger.exception(e)
            return []

    @abc.abstractmethod
    def _insert(self, service_name, service_url, value=''):
        pass

    @check_service_supported
    def insert(self, service_name, service_url, value=''):
        """Insert a service url to database.



        :param str service_name: The service name.

        :param str service_url: The service url.

        :return: None

        """
        try:
            self._insert(service_name, service_url, value)
        except ServicesError as e:
            stat_logger.exception(e)

    @abc.abstractmethod
    def _delete(self, service_name, service_url):
        pass

    @check_service_supported
    def delete(self, service_name, service_url):
        """Delete a service url from database.



        :param str service_name: The service name.

        :param str service_url: The service url.

        :return: None

        """
        try:
            self._delete(service_name, service_url)
        except ServicesError as e:
            stat_logger.exception(e)

    def register_flow(self):
        """Call `self.insert` for insert the flow server address to databae.



        :return: None

        """
        self.insert('flow-server', *server_instance)

    def unregister_flow(self):
        """Call `self.delete` for delete the flow server address from databae.



        :return: None

        """
        self.delete('flow-server', server_instance[0])

    @abc.abstractmethod
    def _get_urls(self, service_name, with_values=False):
        pass

    @check_service_supported
    def get_urls(self, service_name, with_values=False):
        """Query service urls from database. The urls may belong to other nodes.

        Currently, only `ragflow` (model download) urls and `servings` (RAG-Serving) urls are supported.

        `ragflow` is a url containing scheme, host, port and path,

        while `servings` only contains host and port.



        :param str service_name: The service name.

        :return: The service urls.

        :rtype: list

        """
        try:
            return self._get_urls(service_name, with_values)
        except ServicesError as e:
            stat_logger.exception(e)
            return []