mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
- added pyrightconfig.json with typeCheckingMode=strict - added type hints to all files except api.py - added type stubs for dns, etcd, consul, kazoo, pysyncobj and other modules - added type stubs for psycopg2 and urllib3 with some little fixes - fixes most of the issues reported by pyright - remaining issues will be addressed later, along with enabling CI linting task
25 lines
1.8 KiB
Python
25 lines
1.8 KiB
Python
from typing import Any, Dict, List, Optional, Tuple
|
|
class ConsulException(Exception): ...
|
|
class NotFound(ConsulException): ...
|
|
class Check:
|
|
@classmethod
|
|
def http(klass, url: str, interval: str, timeout: Optional[str] = None, deregister: Optional[str] = None) -> Dict[str, str]: ...
|
|
class Consul:
|
|
http: Any
|
|
agent: 'Consul.Agent'
|
|
session: 'Consul.Session'
|
|
kv: 'Consul.KV'
|
|
class KV:
|
|
def get(self, key: str, index: Optional[int]=None, recurse: bool = False, wait: Optional[str] = None, token: Optional[str] = None, consistency: Optional[str] = None, keys: bool = False, separator: Optional[str] = '', dc: Optional[str] = None) -> Tuple[int, Dict[str, Any]]: ...
|
|
def put(self, key: str, value: str, cas: Optional[int] = None, flags: Optional[int] = None, acquire: Optional[str] = None, release: Optional[str] = None, token: Optional[str] = None, dc: Optional[str] = None) -> bool: ...
|
|
def delete(self, key: str, recurse: Optional[bool] = None, cas: Optional[int] = None, token: Optional[str] = None, dc: Optional[str] = None) -> bool: ...
|
|
class Agent:
|
|
service: 'Consul.Agent.Service'
|
|
def self(self) -> Dict[str, Dict[str, Any]]: ...
|
|
class Service:
|
|
def register(self, name: str, service_id=..., address=..., port=..., tags=..., check=..., token=..., script=..., interval=..., ttl=..., http=..., timeout=..., enable_tag_override=...) -> bool: ...
|
|
def deregister(self, service_id: str) -> bool: ...
|
|
class Session:
|
|
def create(self, name: Optional[str] = None, node: Optional[str] = [], checks: Optional[List[str]]=None, lock_delay: float = 15, behavior: str = 'release', ttl: Optional[int] = None, dc: Optional[str] = None) -> str: ...
|
|
def renew(self, session_id: str, dc: Optional[str] = None) -> Optional[str]: ...
|