Source code for firebird.lib.gstat

# SPDX-FileCopyrightText: 2020-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: firebird-lib
# FILE:           firebird/lib/gstat.py
# DESCRIPTION:    Module for work with Firebird gstat output
# CREATED:        6.10.2020
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2020 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
#                 ______________________________________
# pylint: disable=C0302, W0212, R0902, R0912,R0913, R0914, R0915, R0904, R0903

"""firebird.lib.gstat - Module for work with Firebird gstat output

"""

from __future__ import annotations
from typing import List, Tuple, Iterable, Union, Optional
import weakref
from dataclasses import dataclass
import datetime
from enum import Enum
from firebird.base.collections import DataList
from firebird.base.types import Error, STOP, Sentinel

GSTAT_30 = 3

TLogItemSpec = List[Tuple[str, str, Optional[str]]]

items_hdr: TLogItemSpec = [
    ('Flags', 'i', None),
    ('Checksum', 'i', None),
    ('Generation', 'i', None),
    ('System Change Number', 'i', 'system_change_number'),
    ('Page size', 'i', None),
    ('ODS version', 's', None),
    ('Oldest transaction', 'i', 'oit'),
    ('Oldest active', 'i', 'oat'),
    ('Oldest snapshot', 'i', 'ost'),
    ('Next transaction', 'i', None),
    ('Bumped transaction', 'i', None),
    ('Sequence number', 'i', None),
    ('Next attachment ID', 'i', None),
    ('Implementation ID', 'i', None),
    ('Implementation', 's', None),
    ('Shadow count', 'i', None),
    ('Page buffers', 'i', None),
    ('Next header page', 'i', None),
    ('Database dialect', 'i', None),
    ('Creation date', 'd', None),
    ('Attributes', 'l', None)]

items_var: TLogItemSpec = [
    ('Sweep interval:', 'i', None),
    ('Continuation file:', 's', None),
    ('Last logical page:', 'i', None),
    ('Database backup GUID:', 's', 'backup_guid'),
    ('Root file name:', 's', 'root_filename'),
    ('Replay logging file:', 's', None),
    ('Backup difference file:', 's', 'backup_diff_file')]

items_tbl3: TLogItemSpec = [
    ('Primary pointer page:', 'i', None),
    ('Index root page:', 'i', None),
    ('Total formats:', 'i', None),
    ('used formats:', 'i', None),
    ('Average record length:', 'f', 'avg_record_length'),
    ('total records:', 'i', None),
    ('Average version length:', 'f', 'avg_version_length'),
    ('total versions:', 'i', None),
    ('max versions:', 'i', None),
    ('Average fragment length:', 'f', 'avg_fragment_length'),
    ('total fragments:', 'i', None),
    ('max fragments:', 'i', None),
    ('Average unpacked length:', 'f', 'avg_unpacked_length'),
    ('compression ratio:', 'f', None),
    ('Pointer pages:', 'i', 'pointer_pages'),
    ('data page slots:', 'i', None),
    ('Data pages:', 'i', None),
    ('average fill:', 'p', 'avg_fill'),
    ('Primary pages:', 'i', None),
    ('secondary pages:', 'i', None),
    ('swept pages:', 'i', None),
    ('Empty pages:', 'i', None),
    ('full pages:', 'i', None),
    ('Blobs:', 'i', None),
    ('total length:', 'i', 'blobs_total_length'),
    ('blob pages:', 'i', None),
    ('Level 0:', 'i', None),
    ('Level 1:', 'i', None),
    ('Level 2:', 'i', None)]

items_idx3: TLogItemSpec = [
    ('Root page:', 'i', None),
    ('depth:', 'i', None),
    ('leaf buckets:', 'i', None),
    ('nodes:', 'i', None),
    ('Average node length:', 'f', 'avg_node_length'),
    ('total dup:', 'i', None),
    ('max dup:', 'i', None),
    ('Average key length:', 'f', 'avg_key_length'),
    ('compression ratio:', 'f', None),
    ('Average prefix length:', 'f', 'avg_prefix_length'),
    ('average data length:', 'f', 'avg_data_length'),
    ('Clustering factor:', 'f', None),
    ('ratio:', 'f', None)]

items_fill: List[str] = ['0 - 19%', '20 - 39%', '40 - 59%', '60 - 79%', '80 - 99%']

[docs] class DbAttribute(Enum): """Database attributes stored in header page clumplets. """ WRITE = 'force write' NO_RESERVE = 'no reserve' NO_SHARED_CACHE = 'shared cache disabled' ACTIVE_SHADOW = 'active shadow' SHUTDOWN_MULTI = 'multi-user maintenance' SHUTDOWN_SINGLE = 'single-user maintenance' SHUTDOWN_FULL = 'full shutdown' READ_ONLY = 'read only' BACKUP_LOCK = 'backup lock' BACKUP_MERGE = 'backup merge' BACKUP_WRONG = 'wrong backup state'
[docs] @dataclass(frozen=True) class FillDistribution: """Data/Index page fill distribution. """ d20: int d40: int d60: int d80: int d100: int
[docs] @dataclass(frozen=True) class Encryption: """Page encryption status. """ pages: int encrypted: int unencrypted: int
@dataclass class _ParserState: line_no: int = 0 table: StatTable = None index: StatIndex = None new_block: bool = True in_table: bool = False step: int = 0 def empty_str(value: str) -> bool: """Return True if string is empty (whitespace don't count) or None. """ return True if value is None else value.strip() == ''
[docs] class StatTable: """Statisctics for single database table. """ def __init__(self): #: Table name self.name: str = None #: Table ID self.table_id: int = None #: Primary Pointer Page for table self.primary_pointer_page: int = None #: Index Root Page for table self.index_root_page: int = None #: Average record length self.avg_record_length: float = None #: Total number of record in table self.total_records: int = None #: Average record version length self.avg_version_length: float = None #: Total number of record versions self.total_versions: int = None #: Max number of versions for single record self.max_versions: int = None #: Number of data pages for table self.data_pages: int = None #: Number of data page slots for table self.data_page_slots: int = None #: Average data page fill ratio self.avg_fill: float = None #: Data page fill distribution statistics self.distribution: FillDistribution = None #: Indices belonging to table self.indices: DataList[StatIndex] = DataList(type_spec=StatIndex, key_expr='item.name') #: Number of Pointer Pages self.pointer_pages: int = None #: Number of record formats self.total_formats: int = None #: Number of actually used record formats self.used_formats: int = None #: Average length of record fragments self.avg_fragment_length: float = None #: Total number of record fragments self.total_fragments: int = None #: Max number of fragments for single record self.max_fragments: int = None #: Average length of unpacked record self.avg_unpacked_length: float = None #: Record compression ratio self.compression_ratio: float = None #: Number of Primary Data Pages self.primary_pages: int = None #: Number of Secondary Data Pages self.secondary_pages: int = None #: Number of swept data pages self.swept_pages: int = None #: Number of empty data pages self.empty_pages: int = None #: Number of full data pages self.full_pages: int = None #: Number of BLOB values self.blobs: int = None #: Total length of BLOB values (bytes) self.blobs_total_length: int = None #: Number of BLOB pages self.blob_pages: int = None #: Number of Level 0 BLOB values self.level_0: int = None #: Number of Level 1 BLOB values self.level_1: int = None #: Number of Level 2 BLOB values self.level_2: int = None
[docs] class StatIndex: """Statisctics for single database index. """ def __init__(self, table): #: wekref.proxy: Proxy to parent `.StatTable` self.table: weakref.ProxyType = weakref.proxy(table) table.indices.append(weakref.proxy(self)) #: Index name self.name: str = None #: Index ID self.index_id: int = None #: Depth of index tree self.depth: int = None #: Number of leaft index tree buckets self.leaf_buckets: int = None #: Number of index tree nodes self.nodes: int = None #: Average data length self.avg_data_length: float = None #: Total number of duplicate keys self.total_dup: int = None #: Max number of occurences for single duplicate key self.max_dup: int = None #: Index page fill distribution statistics self.distribution: FillDistribution = None #: Index Root page self.root_page: int = None #: Average node length self.avg_node_length: float = None #: Average key length self.avg_key_length: float = None #: Index key compression ratio self.compression_ratio: float = None #: Average key prefix length self.avg_prefix_length: float = None #: Index clustering factor self.clustering_factor: float = None #: Ratio self.ratio: float = None
[docs] class StatDatabase: """Firebird database statistics (produced by gstat). """ def __init__(self): #: GSTAT version self.gstat_version: int = None #: System change number self.system_change_number: int = None #: GSTAT execution timestamp self.executed: datetime.datetime = None #: GSTAT completion timestamp self.completed: datetime.datetime = None #: Database filename self.filename: str = None #: Database flags self.flags: int = 0 #: Database header generation self.generation: int = 0 #: Database page size self.page_size: int = 0 #: Oldest Interesting Transaction self.oit: int = 0 #: Oldest Active Transaction self.oat: int = 0 #: Oldest Snapshot Transaction self.ost: int = 0 #: Next Transaction self.next_transaction: int = 0 #: Next attachment ID self.next_attachment_id: int = 0 #: Implementation self.implementation: str = None #: Number of shadows self.shadow_count: int = 0 #: Number of page buffers self.page_buffers: int = 0 #: Next header page self.next_header_page: int = 0 #: SQL Dialect self.database_dialect: int = 0 #: Database creation timestamp self.creation_date: datetime.datetime = None #: Database attributes self.attributes: List[DbAttribute] = [] # Variable data #: Sweep interval self.sweep_interval: int = None #: Continuation file self.continuation_file: str = None #: Last logical page self.last_logical_page: int = None #: Backup GUID self.backup_guid: str = None #: Root file name self.root_filename: str = None #: Replay logging file self.replay_logging_file: str = None #: Backup difference file self.backup_diff_file: str = None #: Stats for encrypted data pages self.encrypted_data_pages: int = None #: Stats for encrypted index pages self.encrypted_index_pages: int = None #: Stats for encrypted blob pages self.encrypted_blob_pages: int = None #: Database file names self.continuation_files: List[str] = [] # self.__line_no: int = 0 self.__table: StatTable = None self.__index: StatIndex = None self.__new_block: bool = True self.__in_table: bool = False self.__step: int = 0 self.__clear() def __clear(self): self.gstat_version = None self.system_change_number = None self.executed = None self.completed = None self.filename = None self.flags = 0 self.generation = 0 self.page_size = 0 self.oit = 0 self.oat = 0 self.ost = 0 self.next_transaction = 0 self.next_attachment_id = 0 self.implementation = None self.shadow_count = 0 self.page_buffers = 0 self.next_header_page = 0 self.database_dialect = 0 self.creation_date = None self.attributes.clear() self.sweep_interval = None self.continuation_file = None self.last_logical_page = None self.backup_guid = None self.root_filename = None self.replay_logging_file = None self.backup_diff_file = None self.encrypted_data_pages = None self.encrypted_index_pages = None self.encrypted_blob_pages = None self.continuation_files.clear() self.__tables: DataList[StatTable] = DataList(type_spec=StatTable, key_expr='item.name') self.__indices: DataList[StatIndex] = DataList(type_spec=StatIndex, key_expr='item.name') # self.__line_no = 0 self.__table = None self.__index = None self.__new_block = True self.__in_table = False self.__step = 0 def __parse_hdr(self, line: str) -> None: "Parse line from header" for key, valtype, name in items_hdr: if line.startswith(key): # Check for GSTAT_VERSION if self.gstat_version is None: if key == 'System Change Number': self.gstat_version = GSTAT_30 elif key == 'Checksum': raise Error("Output from gstat older than Firebird 3 is not supported") # value: str = line[len(key):].strip() if valtype == 'i': # integer value = int(value) elif valtype == 's': # string pass elif valtype == 'd': # date time value = datetime.datetime.strptime(value, '%b %d, %Y %H:%M:%S') elif valtype == 'l': # list if value == '': value = [] else: value = [x.strip() for x in value.split(',')] value = [DbAttribute(x) for x in value] else: raise Error(f"Unknown value type {valtype}") if name is None: name = key.lower().replace(' ', '_') setattr(self, name, value) return raise Error(f'Unknown information (line {self.__line_no})') def __parse_var(self, line: str) -> None: "Parse line from variable header data" if line == '*END*': return for key, valtype, name in items_var: if line.startswith(key): value = line[len(key):].strip() if valtype == 'i': # integer value = int(value) elif valtype == 's': # string pass elif valtype == 'd': # date time value = datetime.datetime.strptime(value, '%b %d, %Y %H:%M:%S') else: raise Error(f"Unknown value type {valtype}") if name is None: name = key.lower().strip(':').replace(' ', '_') setattr(self, name, value) return raise Error(f'Unknown information (line {self.__line_no})') def __parse_fseq(self, line: str) -> None: "Parse line from file sequence" if not line.startswith('File '): raise Error(f"Bad file specification (line {self.__line_no})") if 'is the only file' in line: return if ' is the ' in line: self.continuation_files.append(line[5:line.index(' is the ')]) elif ' continues as' in line: self.continuation_files.append(line[5:line.index(' continues as')]) else: raise Error(f"Bad file specification (line {self.__line_no})") def __parse_table(self, line: str) -> None: "Parse line from table data" if self.__table.name is None: # pylint: disable=R1702 # We should parse header tname, tid = line.split(' (') self.__table.name = tname.strip(' "') self.__table.table_id = int(tid.strip('()')) else: if ',' in line: # Data values for item in line.split(','): item = item.strip() found = False items = items_tbl3 for key, valtype, name in items: if item.startswith(key): value: str = item[len(key):].strip() if valtype == 'i': # integer value = int(value) elif valtype == 'f': # float value = float(value) elif valtype == 'p': # % value = int(value.strip('%')) else: raise Error(f"Unknown value type {valtype}") if name is None: name = key.lower().strip(':').replace(' ', '_') setattr(self.__table, name, value) found = True break if not found: raise Error(f'Unknown information (line {self.__line_no})') else: # Fill distribution if '=' in line: fill_range, fill_value = line.split('=') i = items_fill.index(fill_range.strip()) if self.__table.distribution is None: self.__table.distribution = [0, 0, 0, 0, 0] self.__table.distribution[i] = int(fill_value.strip()) elif line.startswith('Fill distribution:'): pass else: raise Error(f'Unknown information (line {self.__line_no})') def __parse_index(self, line: str) -> None: "Parse line from index data" if self.__index.name is None: # pylint: disable=R1702 # We should parse header iname, iid = line[6:].split(' (') self.__index.name = iname.strip(' "') self.__index.index_id = int(iid.strip('()')) else: if ',' in line: # Data values for item in line.split(','): item = item.strip() found = False items = items_idx3 for key, valtype, name in items: if item.startswith(key): value: str = item[len(key):].strip() if valtype == 'i': # integer value = int(value) elif valtype == 'f': # float value = float(value) elif valtype == 'p': # % value = int(value.strip('%')) else: raise Error(f"Unknown value type {valtype}") if name is None: name = key.lower().strip(':').replace(' ', '_') setattr(self.__index, name, value) found = True break if not found: raise Error(f'Unknown information (line {self.__line_no})') else: # Fill distribution if '=' in line: fill_range, fill_value = line.split('=') i = items_fill.index(fill_range.strip()) if self.__index.distribution is None: self.__index.distribution = [0, 0, 0, 0, 0] self.__index.distribution[i] = int(fill_value.strip()) elif line.startswith('Fill distribution:'): pass else: raise Error(f'Unknown information (line {self.__line_no})') def __parse_encryption(self, line: str) -> None: "Parse line from encryption data" try: total: str encrypted: str unencrypted: str total, encrypted, unencrypted = line.split(',') _, total = total.rsplit(' ', 1) total = int(total) _, encrypted = encrypted.rsplit(' ', 1) encrypted = int(encrypted) _, unencrypted = unencrypted.rsplit(' ', 1) unencrypted = int(unencrypted) data = Encryption(total, encrypted, unencrypted) except Exception as exc: raise Error(f'Malformed encryption information (line {self.__line_no})') from exc if 'Data pages:' in line: self.encrypted_data_pages = data elif 'Index pages:' in line: self.encrypted_index_pages = data elif 'Blob pages:' in line: self.encrypted_blob_pages = data else: raise Error(f'Unknown encryption information (line {self.__line_no})')
[docs] def has_table_stats(self) -> bool: """Returns True if instance contains information about tables. .. important:: This is not the same as check for empty :data:`tables` list. When gstat is run with `-i` without `-d` option, :data:`tables` list contains instances that does not have any other information about table but table name and its indices. """ return self.tables[0].primary_pointer_page is not None if len(self.tables) > 0 else False
[docs] def has_row_stats(self) -> bool: """Returns True if instance contains information about table rows. """ return self.has_table_stats() and self.tables[0].avg_version_length is not None
[docs] def has_index_stats(self) -> bool: """Returns True if instance contains information about indices. """ return self.indices[0].depth is not None if len(self.indices) > 0 else False
[docs] def has_encryption_stats(self) -> bool: """Returns True if instance contains information about database encryption. """ return self.encrypted_data_pages is not None
[docs] def has_system(self) -> bool: """Returns True if instance contains information about system tables. """ return self.tables.contains("item.name.startswith('RDB$DATABASE')")
[docs] def parse(self, lines: Iterable[str]) -> None: """Parses gstat output. Arguments: lines: Iterable that return lines from database analysis produced by Firebird gstat. """ for line in lines: self.push(line) self.push(STOP)
[docs] def push(self, line: Union[str, Sentinel]) -> None: """Push parser. Arguments: line: Single gstat output line, or `~firebird.base.types.STOP` sentinel. """ if self.__step == -1: self.__clear() if line is STOP: if self.has_table_stats(): for table in self.tables: table.distribution = FillDistribution(*table.distribution) if self.has_index_stats(): for index in self.indices: index.distribution = FillDistribution(*index.distribution) self.tables.freeze() self.indices.freeze() self.__step = -1 else: line = line.strip() self.__line_no += 1 if line.startswith('Gstat completion time'): self.completed = datetime.datetime.strptime(line[22:], '%a %b %d %H:%M:%S %Y') elif self.__step == 0: # Looking for section or self name if line.startswith('Gstat execution time'): self.executed = datetime.datetime.strptime(line[21:], '%a %b %d %H:%M:%S %Y') elif line.startswith('Database header page information:'): self.__step = 1 elif line.startswith('Variable header data:'): self.__step = 2 elif line.startswith('Database file sequence:'): self.__step = 3 elif 'encrypted' in line and 'non-crypted' in line: self.__parse_encryption(line) elif line.startswith('Analyzing database pages ...'): self.__step = 4 elif empty_str(line): pass elif line.startswith('Database "'): _, filename = line.split(' ') self.filename = filename.strip('"') self.__step = 0 else: raise Error(f"Unrecognized data (line {self.__line_no})") elif self.__step == 1: # Header if empty_str(line): # section ends with empty line self.__step = 0 else: self.__parse_hdr(line) elif self.__step == 2: # Variable data if empty_str(line): # section ends with empty line self.__step = 0 else: self.__parse_var(line) elif self.__step == 3: # File sequence if empty_str(line): # section ends with empty line self.__step = 0 else: self.__parse_fseq(line) elif self.__step == 4: # Tables and indices if empty_str(line): # section ends with empty line self.__new_block = True else: if self.__new_block: self.__new_block = False if not line.startswith('Index '): # Should be table self.__table = StatTable() self.tables.append(self.__table) self.__in_table = True self.__parse_table(line) else: # It's index self.__index = StatIndex(self.__table) self.indices.append(self.__index) self.__in_table = False self.__parse_index(line) else: if self.__in_table: self.__parse_table(line) else: self.__parse_index(line)
@property def tables(self) -> DataList[StatTable]: """`~firebird.base.collections.DataList` of `.StatTable` instances. """ return self.__tables @property def indices(self) -> DataList[StatIndex]: """`~firebird.base.collections.DataList` of `StatIndex` instances. """ return self.__indices