Source code for firebird.lib.trace

# SPDX-FileCopyrightText: 2020-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: firebird-lib
# FILE:           firebird/lib/trace.py
# DESCRIPTION:    Module for parsing Firebird trace & audit protocol
# CREATED:        7.10.2020
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2020 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
#                 ______________________________________
# pylint: disable=C0302, W0212, R0902, R0912,R0913, R0914, R0915, R0904, R0903, C0301, W0703

"""firebird.lib.trace - Module for parsing Firebird trace & audit protocol


"""

from __future__ import annotations
from typing import List, Tuple, Dict, Set, Iterable, Any, Optional, Union
from sys import intern
import datetime
import decimal
import collections
from enum import Enum, IntEnum, auto
from dataclasses import dataclass
from firebird.base.types import Error, STOP, Sentinel

[docs] class Status(Enum): """Trace event status codes. """ OK = ' ' FAILED = 'F' UNAUTHORIZED = 'U' UNKNOWN = '?'
[docs] class Event(IntEnum): """Trace event codes. """ UNKNOWN = auto() TRACE_INIT = auto() TRACE_SUSPENDED = auto() TRACE_FINI = auto() CREATE_DATABASE = auto() DROP_DATABASE = auto() ATTACH_DATABASE = auto() DETACH_DATABASE = auto() START_TRANSACTION = auto() COMMIT_TRANSACTION = auto() ROLLBACK_TRANSACTION = auto() COMMIT_RETAINING = auto() ROLLBACK_RETAINING = auto() PREPARE_STATEMENT = auto() EXECUTE_STATEMENT_START = auto() EXECUTE_STATEMENT_FINISH = auto() FREE_STATEMENT = auto() CLOSE_CURSOR = auto() EXECUTE_TRIGGER_START = auto() EXECUTE_TRIGGER_FINISH = auto() EXECUTE_FUNCTION_START = auto() EXECUTE_FUNCTION_FINISH = auto() EXECUTE_PROCEDURE_START = auto() EXECUTE_PROCEDURE_FINISH = auto() START_SERVICE = auto() ATTACH_SERVICE = auto() DETACH_SERVICE = auto() QUERY_SERVICE = auto() SET_CONTEXT = auto() ERROR = auto() WARNING = auto() SWEEP_START = auto() SWEEP_PROGRESS = auto() SWEEP_FINISH = auto() SWEEP_FAILED = auto() COMPILE_BLR = auto() EXECUTE_BLR = auto() EXECUTE_DYN = auto()
[docs] class TraceInfo: """Base class for trace info blocks. """
[docs] class TraceEvent: """Base class for trace events. """
[docs] @dataclass(frozen=True) class AttachmentInfo(TraceInfo): """Information about database attachment. """ #: Attachamnet ID attachment_id: int #: Database name/file database: str #: Database character set charset: str #: Network protocol protocol: str #: Network address address: str #: User name user: str #: Role name role: str #: Remote process remote_process: str #: Remote process ID remote_pid: int
[docs] @dataclass(frozen=True) class TransactionInfo(TraceInfo): """Information about transaction. """ #: Attachamnet ID attachment_id: int #: Transaction ID transaction_id: int #: Initial transaction ID (for retained ones) initial_id: int #: List of transaction options options: List[str]
[docs] @dataclass(frozen=True) class ServiceInfo(TraceInfo): """Information about service attachment. """ #: Service ID service_id: int #: User name user: str #: Network protocol protocol: str #: Network address address: str #: Remote process remote_process: str #: Remote process ID remote_pid: int
[docs] @dataclass(frozen=True) class SQLInfo(TraceInfo): """Information about SQL statement. """ #: SQL ID sql_id: int #: SQL command sql: str #: Execution plan plan: str
[docs] @dataclass(frozen=True) class ParamSet(TraceInfo): """Information about set of parameters. """ #: Parameter set ID par_id: int #: List of parameters (name, value pairs) params: List[Tuple[str, Any]]
[docs] @dataclass(frozen=True) class AccessStats(TraceInfo): """Table access statistics. """ #: Table name table: str #: Number of rows accessed sequentially natural: int #: Number of rows accessed via index index: int #: Number of updated rows update: int #: Number of inserted rows insert: int #: Number of deleted rows delete: int #: Number of rows where a new primary record version or a change to an existing #: primary record version is backed out due to rollback or savepoint undo backout: int #: Number of rows where record version chain is being purged of versions no longer #: needed by OAT or younger transactions purge: int #: Number of rows where record version chain is being deleted due to deletions #: by transactions older than OAT expunge: int
#
[docs] @dataclass(frozen=True) class EventTraceInit(TraceEvent): "Trace session initialized trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Trace session name session_name: str
[docs] @dataclass(frozen=True) class EventTraceSuspend(TraceEvent): "Trace session suspended trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Trace session name session_name: str
[docs] @dataclass(frozen=True) class EventTraceFinish(TraceEvent): "Trace session finished trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Trace session name session_name: str
#
[docs] @dataclass(frozen=True) class EventCreate(TraceEvent): "Create database trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Database name/file database: str #: Connection character set charset: str #: Netwrok protocol protocol: str #: Netwrok address address: str #: User name user: str #: Role name role: str #: Remote process remote_process: str #: Remote process remote_pid: int
[docs] @dataclass(frozen=True) class EventDrop(TraceEvent): "Drop database trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Database name/file database: str #: Connection character set charset: str #: Netwrok protocol protocol: str #: Netwrok address address: str #: User name user: str #: Role name role: str #: Remote process remote_process: str #: Remote process remote_pid: int
[docs] @dataclass(frozen=True) class EventAttach(TraceEvent): "Database attach trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Database name/file database: str #: Connection character set charset: str #: Netwrok protocol protocol: str #: Netwrok address address: str #: User name user: str #: Role name role: str #: Remote process remote_process: str #: Remote process remote_pid: int
[docs] @dataclass(frozen=True) class EventDetach(TraceEvent): "Database detach trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Database name/file database: str #: Connection character set charset: str #: Netwrok protocol protocol: str #: Netwrok address address: str #: User name user: str #: Role name role: str #: Remote process remote_process: str #: Remote process remote_pid: int
#
[docs] @dataclass(frozen=True) class EventTransactionStart(TraceEvent): "Transaction start trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: List of transaction options options: List[str]
[docs] @dataclass(frozen=True) class EventCommit(TraceEvent): "Commit trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: List of transaction options options: List[str] #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int
[docs] @dataclass(frozen=True) class EventRollback(TraceEvent): "Rollback trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: List of transaction options options: List[str] #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int
[docs] @dataclass(frozen=True) class EventCommitRetaining(TraceEvent): "Commit retaining trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: List of transaction options options: List[str] #: New transaction number new_transaction_id: int #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int
[docs] @dataclass(frozen=True) class EventRollbackRetaining(TraceEvent): "Rollback retaining trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: List of transaction options options: List[str] #: New transaction number new_transaction_id: int #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int
#
[docs] @dataclass(frozen=True) class EventPrepareStatement(TraceEvent): "Prepare statement trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: Statement ID statement_id: int #: SQL ID (SQLInfo) sql_id: int #: Statement prepare time in ms prepare_time: int
[docs] @dataclass(frozen=True) class EventStatementStart(TraceEvent): "Statement start trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: Statement ID statement_id: int #: SQL ID (SQLInfo) sql_id: int #: Param set ID (ParamSet) param_id: int
[docs] @dataclass(frozen=True) class EventStatementFinish(TraceEvent): "Statement finish trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: Statement ID statement_id: int #: SQL ID (SQLInfo) sql_id: int #: Param set ID (ParamSet) param_id: int #: Number of affected rows records: int #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int #: List with table access statistics access: List[AccessStats]
[docs] @dataclass(frozen=True) class EventFreeStatement(TraceEvent): "Free statement trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int ##: Transaction ID #transaction_id: int #: Statement ID statement_id: int #: SQL ID (SQLInfo) sql_id: int
[docs] @dataclass(frozen=True) class EventCloseCursor(TraceEvent): "Close cursor trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int ##: Transaction ID #transaction_id: int #: Statement ID statement_id: int #: SQL ID (SQLInfo) sql_id: int
#
[docs] @dataclass(frozen=True) class EventTriggerStart(TraceEvent): "Trigger start trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: trigger name trigger: str #: Table name table: str #: Trigger event event: str
[docs] @dataclass(frozen=True) class EventTriggerFinish(TraceEvent): "Trigger finish trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: trigger name trigger: str #: Table name table: str #: Trigger event event: str #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int #: List with table access statistics access: List[AccessStats]
#
[docs] @dataclass(frozen=True) class EventProcedureStart(TraceEvent): "Procedure start trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: procedure name procedure: str #: Param set ID (ParamSet) param_id: int
[docs] @dataclass(frozen=True) class EventProcedureFinish(TraceEvent): "Procedure finish trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: procedure name procedure: str #: Param set ID (ParamSet) param_id: int #: Number of affected rows records: int #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int #: List with table access statistics access: List[AccessStats]
#
[docs] @dataclass(frozen=True) class EventFunctionStart(TraceEvent): "Function start trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: procedure name function: str #: Param set ID (ParamSet) param_id: int
[docs] @dataclass(frozen=True) class EventFunctionFinish(TraceEvent): "Function finish trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: procedure name function: str #: Param set ID (ParamSet) param_id: int #: Return value returns: Tuple[str, Any] #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int #: List with table access statistics access: List[AccessStats]
#
[docs] @dataclass(frozen=True) class EventServiceAttach(TraceEvent): "Service attach trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Service ID service_id: int
[docs] @dataclass(frozen=True) class EventServiceDetach(TraceEvent): "Service detach trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Service ID service_id: int
[docs] @dataclass(frozen=True) class EventServiceStart(TraceEvent): "Service start trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Service ID service_id: int #: Action performed by service action: str #: List of action parameters parameters: List[str]
[docs] @dataclass(frozen=True) class EventServiceQuery(TraceEvent): "Service query trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Service ID service_id: int #: Action performed by service action: str #: List of sent items sent: List[str] #: List of received items received: List[str]
#
[docs] @dataclass(frozen=True) class EventSetContext(TraceEvent): "Set context variable trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: Context name context: str #: Key key: str #: Value value: str
#
[docs] @dataclass(frozen=True) class EventError(TraceEvent): "Error trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int #: Place where error occured place: str #: Error details details: List[str]
[docs] @dataclass(frozen=True) class EventWarning(TraceEvent): "Warning trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int #: Place where warning occured place: str #: Warning details details: List[str]
[docs] @dataclass(frozen=True) class EventServiceError(TraceEvent): "Service error trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Service ID service_id: int #: Place where error occured place: str #: Error details details: List[str]
[docs] @dataclass(frozen=True) class EventServiceWarning(TraceEvent): "Service warning trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Service ID service_id: int #: Place where warning occured place: str #: Warning details details: List[str]
#
[docs] @dataclass(frozen=True) class EventSweepStart(TraceEvent): "Sweep start trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int #: Transaction ID of the oldest [interesting] transaction oit: int #: Transaction ID of the oldest active transaction oat: int #: Transaction ID of the Oldest Snapshot ost: int #: Transaction ID of the next transaction that will be started next: int
[docs] @dataclass(frozen=True) class EventSweepProgress(TraceEvent): "Sweep progress trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int #: List with table access statistics access: List[AccessStats]
[docs] @dataclass(frozen=True) class EventSweepFinish(TraceEvent): "Sweep finished trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int #: Transaction ID of the oldest [interesting] transaction oit: int #: Transaction ID of the oldest active transaction oat: int #: Transaction ID of the Oldest Snapshot ost: int #: Transaction ID of the next transaction that will be started next: int #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int #: List with table access statistics access: List[AccessStats]
[docs] @dataclass(frozen=True) class EventSweepFailed(TraceEvent): "Sweep failed trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Database attachent ID attachment_id: int
#
[docs] @dataclass(frozen=True) class EventBLRCompile(TraceEvent): "BLR compile trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Statement ID statement_id: int #: BLR content content: str #: Prepare time in ms prepare_time: int
[docs] @dataclass(frozen=True) class EventBLRExecute(TraceEvent): "BLR execution trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: Statement ID statement_id: int #: BLR content content: str #: Execution time in ms run_time: int #: Number of page reads reads: int #: Number of page writes writes: int #: Number of page fetches fetches: int #: Number of pages with changes pending marks: int #: List with table access statistics access: List[AccessStats]
[docs] @dataclass(frozen=True) class EventDYNExecute(TraceEvent): "DYN execution trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event status status: Status #: Database attachent ID attachment_id: int #: Transaction ID transaction_id: int #: DYN content content: str #: Execution time in ms run_time: int
#
[docs] @dataclass(frozen=True) class EventUnknown(TraceEvent): "Uknown trace event" #: Trace event ID event_id: int #: Timestamp when the event occurred timestamp: datetime.datetime #: Event data data: str
def safe_int(str_value: str, base: int=10): """Always returns integer value from string/None argument. Returns 0 if argument is None. """ if str_value: return int(str_value, base) return 0
[docs] class TraceParser: """Parser for standard textual trace log. Produces dataclasses describing individual trace log entries/events. """ def __init__(self): #: Set of attachment ids that were already processed self.seen_attachments: Set[int] = set() #: Set of transaction ids that were already processed self.seen_transactions: Set[int] = set() #: Set of service ids that were already processed self.seen_services: set[int] = set() #: Dictionary that maps (sql_cmd, plan) keys to internal ids self.sqlinfo_map: Dict[Tuple[str, str], int]= {} #: Dictionary that maps parameters (statement or procedure) keys to internal ids self.param_map = {} #: Sequence id that would be assigned to next parsed event (starts with 1) self.next_event_id: int = 1 #: Sequence id that would be assigned to next parsed unique SQL command (starts with 1) self.next_sql_id: int = 1 #: Sequence id that would be assigned to next parsed unique parameter (starts with 1) self.next_param_id: int = 1 #: Parsing option indicating that parsed trace contains `FREE_STATEMENT` events. #: This has impact on cleanup of `SQLInfo` ID cache. When True, the SQLInfo is #: discarded when its FREE_STATEMENT event is processed. When False, the SQLInfo #: is discarded when its EXECUTE_STATEMENT_FINISH is processed. self.has_statement_free: bool = True # self.__infos: collections.deque = collections.deque() self.__pushed: List[str] = [] self.__current_block: collections.deque = collections.deque() self.__last_timestamp: datetime.datetime = None self.__event_values: Dict[str, Any] = {} self.__parse_map = {Event.TRACE_INIT: self.__parser_trace_init, Event.TRACE_FINI: self.__parser_trace_finish, Event.START_TRANSACTION: self.__parser_start_transaction, Event.COMMIT_TRANSACTION: self.__parser_commit_transaction, Event.ROLLBACK_TRANSACTION: self.__parser_rollback_transaction, Event.COMMIT_RETAINING: self.__parser_commit_retaining, Event.ROLLBACK_RETAINING: self.__parser_rollback_retaining, Event.PREPARE_STATEMENT: self.__parser_prepare_statement, Event.EXECUTE_STATEMENT_START: self.__parser_execute_statement_start, Event.EXECUTE_STATEMENT_FINISH: self.__parser_execute_statement_finish, Event.FREE_STATEMENT: self.__parser_free_statement, Event.CLOSE_CURSOR: self.__parser_close_cursor, Event.EXECUTE_TRIGGER_START: self.__parser_trigger_start, Event.EXECUTE_TRIGGER_FINISH: self.__parser_trigger_finish, Event.EXECUTE_FUNCTION_START: self.__parser_func_start, Event.EXECUTE_FUNCTION_FINISH: self.__parser_func_finish, Event.EXECUTE_PROCEDURE_START: self.__parser_procedure_start, Event.EXECUTE_PROCEDURE_FINISH: self.__parser_procedure_finish, Event.CREATE_DATABASE: self.__parser_create_db, Event.DROP_DATABASE: self.__parser_drop_db, Event.ATTACH_DATABASE: self.__parser_attach, Event.DETACH_DATABASE: self.__parser_detach, Event.START_SERVICE: self.__parser_service_start, Event.ATTACH_SERVICE: self.__parser_service_attach, Event.DETACH_SERVICE: self.__parser_service_detach, Event.QUERY_SERVICE: self.__parser_service_query, Event.SET_CONTEXT: self.__parser_set_context, Event.ERROR: self.__parser_error, Event.WARNING: self.__parser_warning, Event.SWEEP_START: self.__parser_sweep_start, Event.SWEEP_PROGRESS: self.__parser_sweep_progress, Event.SWEEP_FINISH: self.__parser_sweep_finish, Event.SWEEP_FAILED: self.__parser_sweep_failed, Event.COMPILE_BLR: self.__parser_blr_compile, Event.EXECUTE_BLR: self.__parser_blr_execute, Event.EXECUTE_DYN: self.__parser_dyn_execute, Event.UNKNOWN: self.__parser_unknown} def _is_entry_header(self, line: str) -> bool: items = line.split() try: datetime.datetime.strptime(items[0], '%Y-%m-%dT%H:%M:%S.%f') return True except Exception: return False def _is_session_suspended(self, line: str) -> bool: return line.rfind('is suspended as its log is full ---') >= 0 def _is_plan_separator(self, line: str) -> bool: return line == '^' * 79 def _is_perf_start(self, line: str) -> bool: result = line.endswith(' records fetched') if result: result = line[:-len(' records fetched')].isdigit() return result def _is_blr_perf_start(self, line: str) -> bool: parts = line.split() return 'ms' in parts or 'fetch(es)' in parts or 'mark(s)' in parts or 'read(s)' in parts or 'write(s)' in parts def _is_param_start(self, line: str) -> bool: return line.startswith('param0 = ') def _iter_trace_blocks(self, ilines): lines = [] for line in ilines: line = line.strip() if line: if not lines: if self._is_entry_header(line): lines.append(line) else: if self._is_entry_header(line) or self._is_session_suspended(line): yield lines lines = [line] else: lines.append(line) if lines: yield lines def _identify_event(self, line: str) -> Event: items = line.split() if (len(items) == 3) or (items[2] in ('ERROR', 'WARNING')): return Event.__members__.get(items[2], Event.UNKNOWN) if items[2] == 'UNAUTHORIZED': return Event.__members__.get(items[3], Event.UNKNOWN) if items[2] == 'FAILED': return Event.__members__.get(items[3], Event.UNKNOWN) if items[2] == 'Unknown': return Event.UNKNOWN raise Error(f'Unrecognized event header: "{line}"') def _parse_attachment_info(self, values: Dict[str, Any], check: bool=True) -> None: database, _, attachment = self.__current_block.popleft().partition(' (') values['database'] = intern(database) attachment_id, user_role, charset, protocol_address = attachment.strip('()').split(',') _, attachment_id = attachment_id.split('_') values['attachment_id'] = int(attachment_id) values['charset'] = intern(charset.strip()) protocol_address = protocol_address.strip() if protocol_address == '<internal>': protocol = address = protocol_address else: protocol, address = protocol_address.split(':', 1) values['protocol'] = intern(protocol) values['address'] = intern(address) if ':' in user_role: user, role = user_role.strip().split(':') else: user = user_role.strip() role = 'NONE' values['user'] = intern(user) values['role'] = intern(role) if protocol_address == '<internal>': values['remote_process'] = None values['remote_pid'] = None elif len(self.__current_block) > 0 and not (self.__current_block[0].startswith('(TRA') or ' ms,' in self.__current_block[0] or 'Transaction counters:' in self.__current_block[0]): # This could be actually part of error message or separator line, not remote process spec values['remote_process'] = None values['remote_pid'] = None remote_process_id = self.__current_block[0] if not remote_process_id.startswith('---'): remote_process, remote_pid = remote_process_id.rsplit(':', 1) if remote_pid.isdigit(): # it looks like we have genuine remote process info values['remote_process'] = intern(remote_process) values['remote_pid'] = int(remote_pid) self.__current_block.popleft() else: values['remote_process'] = None values['remote_pid'] = None # if check and values['attachment_id'] not in self.seen_attachments: self.__infos.append(AttachmentInfo(**values)) self.seen_attachments.add(values['attachment_id']) def _parse_transaction_info(self, values: Dict[str, Any], check: bool=True) -> None: # Transaction parameters items = self.__current_block.popleft().strip('\t ()').split(',') if len(items) == 2: transaction_id, transaction_options = items initial_id = None else: transaction_id, initial_id, transaction_options = items initial_id = int(initial_id[6:]) _, transaction_id = transaction_id.split('_') values['transaction_id'] = int(transaction_id) values['options'] = [intern(x.strip()) for x in transaction_options.split('|')] values['initial_id'] = initial_id if check and values['transaction_id'] not in self.seen_transactions: self.__infos.append(TransactionInfo(**values)) del values['initial_id'] self.seen_transactions.add(values['transaction_id']) def _parse_transaction_performance(self) -> None: self.__event_values['run_time'] = None self.__event_values['reads'] = None self.__event_values['writes'] = None self.__event_values['fetches'] = None self.__event_values['marks'] = None if self.__current_block: for value in self.__current_block.popleft().split(','): value, val_type = value.split() if 'ms' in val_type: self.__event_values['run_time'] = int(value) elif 'read' in val_type: self.__event_values['reads'] = int(value) elif 'write' in val_type: self.__event_values['writes'] = int(value) elif 'fetch' in val_type: self.__event_values['fetches'] = int(value) elif 'mark' in val_type: self.__event_values['marks'] = int(value) else: raise Error(f"Unhandled performance parameter {val_type}") def _parse_attachment_and_transaction(self) -> None: # Attachment att_values = {} self._parse_attachment_info(att_values) # Transaction tr_values = {} tr_values['attachment_id'] = att_values['attachment_id'] self._parse_transaction_info(tr_values) self.__event_values['attachment_id'] = tr_values['attachment_id'] self.__event_values['transaction_id'] = tr_values['transaction_id'] def _parse_statement_id(self) -> None: self.__event_values['plan'] = None self.__event_values['sql'] = None line = self.__current_block.popleft() if line.startswith('Statement'): stmt_id = line.split()[1] self.__event_values['statement_id'] = int(stmt_id[:-1]) if self.__event_values['status'] == Status.FAILED: return line = self.__current_block.popleft() else: self.__event_values['statement_id'] = 0 if line != '-'*79: raise Error("Separator '-'*79 line expected") def _parse_blr_statement_id(self) -> None: line = self.__current_block[0].strip() if line.startswith('Statement ') and line[-1] == ':': _, stmt_id = self.__current_block.popleft().split() self.__event_values['statement_id'] = int(stmt_id[:-1]) else: self.__event_values['statement_id'] = None def _parse_blrdyn_content(self) -> None: if self.__current_block[0] == '-' * 79: self.__current_block.popleft() content = [] line = self.__current_block.popleft() while line and not self._is_blr_perf_start(line): content.append(line) line = self.__current_block.popleft() if self.__current_block else None if line: self.__current_block.appendleft(line) self.__event_values['content'] = '\n'.join(content) else: self.__event_values['content'] = None def _parse_prepare_time(self) -> None: if self.__current_block and self.__current_block[-1].endswith(' ms'): time, _ = self.__current_block.pop().split() self.__event_values['prepare_time'] = int(time) else: self.__event_values['prepare_time'] = None def _parse_sql_statement(self) -> None: if self.__current_block: sql = [] line = self.__current_block.popleft() while line and not (self._is_plan_separator(line) or self._is_perf_start(line) or self._is_param_start(line)): sql.append(line) line = self.__current_block.popleft() if self.__current_block else None if line: self.__current_block.appendleft(line) self.__event_values['sql'] = intern('\n'.join(sql)) def _parse_plan(self) -> None: if self.__current_block: line = self.__current_block.popleft() if self._is_perf_start(line) or self._is_param_start(line): self.__current_block.appendleft(line) return if not self._is_plan_separator(line): raise Error("Separator '^'*79 line expected") plan = [] line = self.__current_block.popleft() while line and not (self._is_perf_start(line) or self._is_param_start(line)): plan.append(line) line = self.__current_block.popleft() if self.__current_block else None if line: self.__current_block.appendleft(line) self.__event_values['plan'] = intern('\n'.join(plan)) def _parse_value_spec(self, param_def: str) -> Tuple[str, Any]: param_type, param_value = param_def.split(',', 1) param_type = intern(param_type) param_value = param_value.strip(' "') if param_value == '<NULL>': param_value = None elif param_type in ('smallint', 'integer', 'bigint'): param_value = int(param_value) elif param_type == 'timestamp': param_value = datetime.datetime.strptime(param_value, '%Y-%m-%dT%H:%M:%S.%f') elif param_type == 'date': param_value = datetime.datetime.strptime(param_value, '%Y-%m-%d') elif param_type == 'time': param_value = datetime.datetime.strptime(param_value, '%H:%M:%S.%f') elif param_type in ('float', 'double precision'): param_value = decimal.Decimal(param_value) return (param_type, param_value,) def _parse_parameters_block(self) -> List[Tuple[str, Any]]: parameters = [] while self.__current_block and self.__current_block[0].startswith('param'): line = self.__current_block.popleft() _, param_def = line.split(' = ') parameters.append(self._parse_value_spec(param_def)) return parameters def _parse_parameters(self, for_procedure: bool=False) -> None: parameters = self._parse_parameters_block() while self.__current_block and self.__current_block[0].endswith('more arguments skipped...'): self.__current_block.popleft() # param_id = None if parameters: key = tuple(parameters) if key in self.param_map: param_id = self.param_map[key] else: param_id = self.next_param_id self.next_param_id += 1 self.param_map[key] = param_id self.__infos.append(ParamSet(**{'par_id': param_id, 'params': parameters})) # self.__event_values['param_id'] = param_id def _parse_performance(self) -> None: self.__event_values['run_time'] = None self.__event_values['reads'] = None self.__event_values['writes'] = None self.__event_values['fetches'] = None self.__event_values['marks'] = None self.__event_values['access'] = None if not self.__current_block: return if 'records fetched' in self.__current_block[0]: line = self.__current_block.popleft() self.__event_values['records'] = int(line.split()[0]) else: self.__event_values['records'] = None values = self.__current_block.popleft().split(',') while values: next_value = values.pop() value, val_type = next_value.split() if 'ms' in val_type: self.__event_values['run_time'] = int(value) elif 'read' in val_type: self.__event_values['reads'] = int(value) elif 'write' in val_type: self.__event_values['writes'] = int(value) elif 'fetch' in val_type: self.__event_values['fetches'] = int(value) elif 'mark' in val_type: self.__event_values['marks'] = int(value) else: raise Error(f"Unhandled performance parameter {val_type}") if self.__current_block: self.__event_values['access'] = [] if self.__current_block.popleft() != "Table Natural Index Update Insert Delete Backout Purge Expunge": raise Error("Performance table header expected") if self.__current_block.popleft() != "*"*111: raise Error("Performance table header separator expected") while self.__current_block: entry = self.__current_block.popleft() self.__event_values['access'].append(AccessStats(intern(entry[:32].strip()), safe_int(entry[32:41].strip()), safe_int(entry[41:51].strip()), safe_int(entry[51:61].strip()), safe_int(entry[61:71].strip()), safe_int(entry[71:81].strip()), safe_int(entry[81:91].strip()), safe_int(entry[91:101].strip()), safe_int(entry[101:111].strip()))) def _parse_sql_info(self) -> None: plan = self.__event_values['plan'] sql = self.__event_values['sql'] key = (sql, plan) # if key in self.sqlinfo_map: sql_id = self.sqlinfo_map[key] else: sql_id = self.next_sql_id self.next_sql_id += 1 self.sqlinfo_map[key] = sql_id self.__infos.append(SQLInfo(**{'sql_id': sql_id, 'sql': sql, 'plan': plan,})) self.__event_values['sql_id'] = sql_id def _parse_trigger(self) -> None: trigger, event = self.__current_block.popleft().split('(') if ' FOR ' in trigger: trigger, table = trigger.split(' FOR ') self.__event_values['trigger'] = intern(trigger) self.__event_values['table'] = intern(table.strip()) else: self.__event_values['trigger'] = intern(trigger.strip()) self.__event_values['table'] = None self.__event_values['event'] = intern(event.strip('()')) def _parse_service(self) -> None: svc_id = '' line = self.__current_block.popleft() #if 'service_mgr' not in line: #raise Error("Service connection description expected.") _, _, spec = line.partition(' (') items = spec.strip('()').split(',') if len(items) == 4: svc_id, user, protocol_address, remote_process_id = items else: svc_id, user, protocol_address = items remote_process_id = None _, svc_id = svc_id.split(' ') svc_id = int(svc_id if svc_id.startswith('0x') else f'0x{svc_id}', 0) if svc_id not in self.seen_services: svc_values = {} svc_values['service_id'] = svc_id svc_values['user'] = intern(user.strip()) protocol_address = protocol_address.strip() if protocol_address == 'internal': protocol = address = protocol_address else: protocol, address = protocol_address.split(':', 1) svc_values['protocol'] = intern(protocol) svc_values['address'] = intern(address) if remote_process_id is not None: remote_process_id = remote_process_id.strip() remote_process, remote_pid = remote_process_id.rsplit(':', 1) svc_values['remote_process'] = intern(remote_process) svc_values['remote_pid'] = int(remote_pid) else: svc_values['remote_process'] = None svc_values['remote_pid'] = None self.__infos.append(ServiceInfo(**svc_values)) self.seen_services.add(svc_id) self.__event_values['service_id'] = svc_id def _parse_sweep_attachment(self) -> None: att_values = {} self._parse_attachment_info(att_values) self.__event_values['attachment_id'] = att_values['attachment_id'] def _parse_sweep_tr_counters(self) -> None: line = self.__current_block.popleft() if not line: line = self.__current_block.popleft() if 'Transaction counters:' not in line: raise Error("Transaction counters expected") while len(self.__current_block) > 0: line = self.__current_block.popleft() if 'Oldest interesting' in line: self.__event_values['oit'] = int(line.rsplit(' ', 1)[1]) elif 'Oldest active' in line: self.__event_values['oat'] = int(line.rsplit(' ', 1)[1]) elif 'Oldest snapshot' in line: self.__event_values['ost'] = int(line.rsplit(' ', 1)[1]) elif 'Next transaction' in line: self.__event_values['next'] = int(line.rsplit(' ', 1)[1]) elif 'ms' in line and len(self.__current_block) >= 0: # Put back performance counters self.__current_block.appendleft(line) break def __parse_trace_header(self) -> None: line = self.__current_block.popleft() items = line.split() self.__last_timestamp = datetime.datetime.strptime(items[0], '%Y-%m-%dT%H:%M:%S.%f') if (len(items) == 3) or (items[2] in ('ERROR', 'WARNING')): self.__event_values['status'] = Status.OK else: if items[2] == 'UNAUTHORIZED': self.__event_values['status'] = Status.UNAUTHORIZED elif items[2] == 'FAILED': self.__event_values['status'] = Status.FAILED elif items[2] == 'Unknown': self.__event_values['status'] = Status.UNKNOWN else: raise Error(f'Unrecognized event header: "{line}"') # self.__event_values['event_id'] = self.next_event_id self.next_event_id += 1 self.__event_values['timestamp'] = self.__last_timestamp def __parser_trace_suspend(self) -> EventTraceSuspend: # Session was suspended because log was full, so we will create fake event to note that line = self.__current_block.popleft() self.__event_values['timestamp'] = self.__last_timestamp self.__event_values['event_id'] = self.next_event_id self.next_event_id += 1 session_name = line[4:line.find(' is suspended')] self.__event_values['session_name'] = intern(session_name.replace(' ', '_').upper()) return EventTraceSuspend(**self.__event_values) def __parser_trace_init(self) -> EventTraceInit: self.__parse_trace_header() del self.__event_values['status'] self.__event_values['session_name'] = intern(self.__current_block.popleft()) return EventTraceInit(**self.__event_values) def __parser_trace_finish(self) -> EventTraceFinish: self.__parse_trace_header() del self.__event_values['status'] self.__event_values['session_name'] = intern(self.__current_block.popleft()) return EventTraceFinish(**self.__event_values) def __parser_start_transaction(self) -> EventTransactionStart: self.__parse_trace_header() # Attachment values = {} self._parse_attachment_info(values) self.__event_values['attachment_id'] = values['attachment_id'] # Transaction parameters self._parse_transaction_info(self.__event_values, check=False) return EventTransactionStart(**self.__event_values) def __parser_commit_transaction(self) -> EventCommit: self.__parse_trace_header() # Attachment values = {} self._parse_attachment_info(values) self.__event_values['attachment_id'] = values['attachment_id'] # Transaction parameters self._parse_transaction_info(self.__event_values, check=False) self._parse_transaction_performance() self.seen_transactions.remove(self.__event_values['transaction_id']) return EventCommit(**self.__event_values) def __parser_rollback_transaction(self) -> EventRollback: self.__parse_trace_header() # Attachment values = {} self._parse_attachment_info(values) self.__event_values['attachment_id'] = values['attachment_id'] # Transaction parameters self._parse_transaction_info(self.__event_values, check=False) self._parse_transaction_performance() self.seen_transactions.remove(self.__event_values['transaction_id']) return EventRollback(**self.__event_values) def __parser_commit_retaining(self) -> EventCommitRetaining: self.__parse_trace_header() # Attachment values = {} self._parse_attachment_info(values) self.__event_values['attachment_id'] = values['attachment_id'] # Transaction parameters self._parse_transaction_info(self.__event_values, check=False) if self.__current_block and self.__current_block[0].startswith('New number'): self.__event_values['new_transaction_id'] = int(self.__current_block.popleft().strip()[11:]) else: self.__event_values['new_transaction_id'] = None self._parse_transaction_performance() return EventCommitRetaining(**self.__event_values) def __parser_rollback_retaining(self) -> EventRollbackRetaining: self.__parse_trace_header() # Attachment values = {} self._parse_attachment_info(values) self.__event_values['attachment_id'] = values['attachment_id'] # Transaction parameters self._parse_transaction_info(self.__event_values, check=False) if self.__current_block and self.__current_block[0].startswith('New number'): self.__event_values['new_transaction_id'] = int(self.__current_block.popleft().strip()[11:]) else: self.__event_values['new_transaction_id'] = None self._parse_transaction_performance() return EventRollbackRetaining(**self.__event_values) def __parser_prepare_statement(self) -> EventPrepareStatement: self.__parse_trace_header() self._parse_attachment_and_transaction() self._parse_statement_id() self._parse_prepare_time() self._parse_sql_statement() self._parse_plan() self._parse_sql_info() # del self.__event_values['plan'] del self.__event_values['sql'] return EventPrepareStatement(**self.__event_values) def __parser_execute_statement_start(self) -> EventStatementStart: self.__parse_trace_header() self._parse_attachment_and_transaction() self._parse_statement_id() self._parse_sql_statement() self._parse_plan() self._parse_parameters() self._parse_sql_info() # del self.__event_values['plan'] del self.__event_values['sql'] return EventStatementStart(**self.__event_values) def __parser_execute_statement_finish(self) -> EventStatementFinish: self.__parse_trace_header() self._parse_attachment_and_transaction() self._parse_statement_id() self._parse_sql_statement() self._parse_plan() self._parse_parameters() self.__event_values['records'] = None self._parse_performance() self._parse_sql_info() # if not self.has_statement_free: del self.sqlinfo_map[self.__event_values['sql'], self.__event_values['plan']] del self.__event_values['plan'] del self.__event_values['sql'] return EventStatementFinish(**self.__event_values) def __parser_free_statement(self) -> EventFreeStatement: self.__parse_trace_header() att_values = {} self._parse_attachment_info(att_values) self.__event_values['attachment_id'] = att_values['attachment_id'] #self._parse_attachment_and_transaction() self._parse_statement_id() self._parse_sql_statement() self._parse_plan() self._parse_sql_info() del self.__event_values['status'] # del self.sqlinfo_map[self.__event_values['sql'], self.__event_values['plan']] del self.__event_values['plan'] del self.__event_values['sql'] return EventFreeStatement(**self.__event_values) def __parser_close_cursor(self) -> EventCloseCursor: self.__parse_trace_header() att_values = {} self._parse_attachment_info(att_values) self.__event_values['attachment_id'] = att_values['attachment_id'] #self._parse_attachment_and_transaction() self._parse_statement_id() self._parse_sql_statement() self._parse_plan() self._parse_sql_info() del self.__event_values['status'] # del self.__event_values['plan'] del self.__event_values['sql'] return EventCloseCursor(**self.__event_values) def __parser_trigger_start(self) -> EventTriggerStart: self.__parse_trace_header() self._parse_attachment_and_transaction() self._parse_trigger() return EventTriggerStart(**self.__event_values) def __parser_trigger_finish(self) -> EventTriggerFinish: self.__parse_trace_header() self._parse_attachment_and_transaction() self._parse_trigger() self._parse_performance() if 'records' in self.__event_values: del self.__event_values['records'] return EventTriggerFinish(**self.__event_values) def __parser_procedure_start(self) -> EventProcedureStart: self.__parse_trace_header() self._parse_attachment_and_transaction() _, name = self.__current_block.popleft().split() self.__event_values['procedure'] = intern(name[:-1]) self._parse_parameters(for_procedure=True) return EventProcedureStart(**self.__event_values) def __parser_procedure_finish(self) -> EventProcedureFinish: self.__parse_trace_header() self._parse_attachment_and_transaction() _, name = self.__current_block.popleft().split() self.__event_values['procedure'] = intern(name[:-1]) self._parse_parameters(for_procedure=True) self._parse_performance() return EventProcedureFinish(**self.__event_values) def __parser_func_start(self) -> EventProcedureStart: self.__parse_trace_header() self._parse_attachment_and_transaction() _, name = self.__current_block.popleft().split() self.__event_values['function'] = intern(name[:-1]) self._parse_parameters(for_procedure=True) return EventFunctionStart(**self.__event_values) def __parser_func_finish(self) -> EventProcedureFinish: self.__parse_trace_header() self._parse_attachment_and_transaction() _, name = self.__current_block.popleft().split() self.__event_values['function'] = intern(name[:-1]) self._parse_parameters(for_procedure=True) self.__current_block.popleft() # returns: self.__event_values['returns'] = self._parse_parameters_block()[0] self._parse_performance() if 'records' in self.__event_values: del self.__event_values['records'] return EventFunctionFinish(**self.__event_values) def __parser_create_db(self) -> EventCreate: self.__parse_trace_header() # Attachment parameters self._parse_attachment_info(self.__event_values, check=False) return EventCreate(**self.__event_values) def __parser_drop_db(self) -> EventDrop: self.__parse_trace_header() # Attachment parameters self._parse_attachment_info(self.__event_values, check=False) return EventDrop(**self.__event_values) def __parser_attach(self) -> EventAttach: self.__parse_trace_header() # Attachment parameters self._parse_attachment_info(self.__event_values, check=False) return EventAttach(**self.__event_values) def __parser_detach(self) -> EventDetach: self.__parse_trace_header() # Attachment parameters self._parse_attachment_info(self.__event_values, check=False) self.seen_attachments.remove(self.__event_values['attachment_id']) return EventDetach(**self.__event_values) def __parser_service_start(self) -> EventServiceStart: self.__parse_trace_header() self._parse_service() # service parameters action = self.__current_block.popleft().strip('"') self.__event_values['action'] = intern(action) parameters = [] while len(self.__current_block) > 0: parameters.append(self.__current_block.popleft()) self.__event_values['parameters'] = parameters # return EventServiceStart(**self.__event_values) def __parser_service_attach(self) -> EventServiceAttach: self.__parse_trace_header() self._parse_service() return EventServiceAttach(**self.__event_values) def __parser_service_detach(self) -> EventServiceDetach: self.__parse_trace_header() self._parse_service() self.seen_services.remove(self.__event_values['service_id']) return EventServiceDetach(**self.__event_values) def __parser_service_query(self) -> EventServiceQuery: self.__parse_trace_header() self._parse_service() # service parameters line = self.__current_block.popleft().strip() if line[0] == line[-1] == '"': action = line.strip('"') self.__event_values['action'] = intern(action) if len(self.__current_block) > 0: line = self.__current_block.popleft().strip() else: self.__event_values['action'] = None sent = [] received = [] while len(self.__current_block) > 0: #line = self.__current_block.popleft().strip() if line.startswith('Send portion of the query:'): while not line.startswith('Receive portion of the query:'): line = self.__current_block.popleft().strip() sent.append(line) if len(self.__current_block) == 0: break if line.startswith('Receive portion of the query:'): while len(self.__current_block) > 0: received.append(self.__current_block.popleft().strip()) self.__event_values['sent'] = sent self.__event_values['received'] = received # return EventServiceQuery(**self.__event_values) def __parser_set_context(self) -> EventSetContext: self.__parse_trace_header() self._parse_attachment_and_transaction() line = self.__current_block.popleft() context, line = line.split(']', 1) key, value = line.split('=', 1) self.__event_values['context'] = intern(context[1:]) self.__event_values['key'] = intern(key.strip()) self.__event_values['value'] = value.strip(' "') del self.__event_values['status'] return EventSetContext(**self.__event_values) def __parser_error(self) -> Union[EventServiceError, EventError]: self.__event_values['place'] = self.__current_block[0].split(' AT ')[1] self.__parse_trace_header() att_values = {} if 'service_mgr' in self.__current_block[0]: event_class = EventServiceError self._parse_service() else: event_class = EventError self._parse_attachment_info(att_values) self.__event_values['attachment_id'] = att_values['attachment_id'] details = [] while len(self.__current_block) > 0: details.append(self.__current_block.popleft()) self.__event_values['details'] = details del self.__event_values['status'] return event_class(**self.__event_values) def __parser_warning(self) -> Union[EventServiceWarning, EventWarning]: self.__event_values['place'] = self.__current_block[0].split(' AT ')[1] self.__parse_trace_header() att_values = {} if 'service_mgr' in self.__current_block[0]: event_class = EventServiceWarning self._parse_service() else: event_class = EventWarning self._parse_attachment_info(att_values) self.__event_values['attachment_id'] = att_values['attachment_id'] details = [] while len(self.__current_block) > 0: details.append(self.__current_block.popleft()) self.__event_values['details'] = details del self.__event_values['status'] return event_class(**self.__event_values) def __parser_sweep_start(self) -> EventSweepStart: self.__parse_trace_header() self._parse_sweep_attachment() self._parse_sweep_tr_counters() del self.__event_values['status'] return EventSweepStart(**self.__event_values) def __parser_sweep_progress(self) -> EventSweepProgress: self.__parse_trace_header() self._parse_sweep_attachment() self._parse_performance() del self.__event_values['status'] del self.__event_values['records'] return EventSweepProgress(**self.__event_values) def __parser_sweep_finish(self) -> EventSweepFinish: self.__parse_trace_header() self._parse_sweep_attachment() self._parse_sweep_tr_counters() self._parse_performance() del self.__event_values['status'] del self.__event_values['records'] return EventSweepFinish(**self.__event_values) def __parser_sweep_failed(self) -> EventSweepFailed: self.__parse_trace_header() self._parse_sweep_attachment() del self.__event_values['status'] return EventSweepFailed(**self.__event_values) def __parser_blr_compile(self) -> EventBLRCompile: self.__parse_trace_header() # Attachment values = {} self._parse_attachment_info(values) self.__event_values['attachment_id'] = values['attachment_id'] # BLR self._parse_blr_statement_id() self._parse_blrdyn_content() self._parse_prepare_time() return EventBLRCompile(**self.__event_values) def __parser_blr_execute(self) -> EventBLRExecute: self.__parse_trace_header() self._parse_attachment_and_transaction() # BLR self._parse_blr_statement_id() self._parse_blrdyn_content() self._parse_performance() del self.__event_values['records'] return EventBLRExecute(**self.__event_values) def __parser_dyn_execute(self) -> EventDYNExecute: self.__parse_trace_header() self._parse_attachment_and_transaction() # DYN self._parse_blrdyn_content() value, _ = self.__current_block.popleft().split() self.__event_values['run_time'] = int(value) return EventDYNExecute(**self.__event_values) def __parser_unknown(self) -> EventUnknown: items = self.__current_block[0].split() self.__parse_trace_header() self.__current_block.appendleft(' '.join(items[2:])) del self.__event_values['status'] self.__event_values['data'] = '\n'.join(self.__current_block) return EventUnknown(**self.__event_values)
[docs] def retrieve_info(self) -> List[TraceInfo]: """Returns list of `.TraceInfo` instances produced by last `.parse_event()` call. The list could be empty. Important: The internal buffer for info instances is cleared after call to this method, so all produced info instances are returned only once. """ result = self.__infos.copy() self.__infos.clear() return result
[docs] def parse_event(self, trace_block: List[str]) -> TraceEvent: """Parse single trace event. Arguments: trace_block: List with trace entry lines for single trace event. """ self.__current_block.clear() self.__current_block.extend(trace_block) self.__event_values.clear() if self._is_session_suspended(self.__current_block[0]): return self.__parser_trace_suspend() return self.__parse_map[self._identify_event(self.__current_block[0])]()
[docs] def parse(self, lines: Iterable): """Parse output from Firebird trace session. Arguments: lines: Iterable that return lines produced by Firebird trace session. Yields: `.TraceEvent` and `.TraceInfo` dataclasses describing individual trace log entries/events. Raises: firebird.base.types.Error: When any problem is found in input stream. """ for rec in (self.parse_event(x) for x in self._iter_trace_blocks(lines)): while len(self.__infos) > 0: yield self.__infos.popleft() yield rec
[docs] def push(self, line: Union[str, Sentinel]) -> Optional[List[Union[TraceEvent, TraceInfo]]]: """Push parser. Arguments: line: Single trace output line, or `~firebird.base.types.STOP` sentinel. Returns: None, or list with parsed elements (single event preceded by any info blocks related to it). Raises: Error: When pushed line is not recognized as part of trace event. """ if line is STOP: if self.__pushed: event = self.parse_event(self.__pushed) self.__pushed.clear() result = self.__infos.copy() result.append(event) return result else: line = line.strip() if line: if not self.__pushed: if self._is_entry_header(line): self.__pushed.append(line) else: raise Error(f'Unrecognized trace line "{line}"') else: if self._is_entry_header(line) or self._is_session_suspended(line): event = self.parse_event(self.__pushed) result = self.__infos.copy() self.__infos.clear() result.append(event) self.__pushed = [line] return result self.__pushed.append(line) return None