Source code for dragonfly.engines.backend_sapi5.engine

#
# This file is part of Dragonfly.
# (c) Copyright 2007, 2008 by Christo Butcher
# Licensed under the LGPL.
#
#   Dragonfly is free software: you can redistribute it and/or modify it
#   under the terms of the GNU Lesser General Public License as published
#   by the Free Software Foundation, either version 3 of the License, or
#   (at your option) any later version.
#
#   Dragonfly is distributed in the hope that it will be useful, but
#   WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#   Lesser General Public License for more details.
#
#   You should have received a copy of the GNU Lesser General Public
#   License along with Dragonfly.  If not, see
#   <http://www.gnu.org/licenses/>.
#

"""
SAPI 5 engine classes
============================================================================

"""


#---------------------------------------------------------------------------

import logging
import time
import os.path

import pythoncom
from datetime         import datetime
from ctypes           import Structure, c_long, c_int, c_uint, pointer
from ctypes           import windll, WinError, WINFUNCTYPE
from ctypes.wintypes  import DWORD, HANDLE, HWND, LONG

import win32con
from six                       import string_types, integer_types
from win32com.client           import Dispatch, getevents, constants
from win32com.client.gencache  import EnsureDispatch

from dragonfly.grammar.recobs  import RecognitionObserver
from dragonfly.windows.window  import Window
from dragonfly.engines.base    import (EngineBase, EngineError,
                                       MimicFailure, DelegateTimerManager,
                                       DelegateTimerManagerInterface,
                                       DictationContainerBase,
                                       GrammarWrapperBase)
from dragonfly.engines.backend_sapi5.speaker   import Sapi5Speaker
from dragonfly.engines.backend_sapi5.compiler  import Sapi5Compiler
from dragonfly.engines.backend_sapi5.recobs    import Sapi5RecObsManager


#===========================================================================

class POINT(Structure):
    _fields_ = [('x', c_long),
                ('y', c_long)]

class MSG(Structure):
    _fields_ = [('hwnd', c_int),
                ('message', c_uint),
                ('wParam', c_int),
                ('lParam', c_int),
                ('time', c_int),
                ('pt', POINT)]


class MimicObserver(RecognitionObserver):
    _log = logging.getLogger("SAPI5 RecObs")

    def __init__(self):
        RecognitionObserver.__init__(self)
        self.status = "none"

    def on_recognition(self, words):
        self._log.debug("SAPI5 RecObs on_recognition(): %r" % (words,))
        self.status = "recognition: %r" % (words,)

    def on_failure(self):
        self._log.debug("SAPI5 RecObs on_failure()")
        self.status = "failure"


#===========================================================================

[docs]class Sapi5SharedEngine(EngineBase, DelegateTimerManagerInterface): """ Speech recognition engine back-end for SAPI 5 shared recognizer. """ _name = "sapi5shared" recognizer_dispatch_name = "SAPI.SpSharedRecognizer" DictationContainer = DictationContainerBase #----------------------------------------------------------------------- def __init__(self, retain_dir=None): """ :param retain_dir: Retains recognized audio and/or metadata in the given directory, saving audio to ``retain_[timestamp].wav`` file and metadata to ``retain.tsv``. Disabled by default (``None``). :type retain_dir: str|None """ EngineBase.__init__(self) DelegateTimerManagerInterface.__init__(self) EnsureDispatch(self.recognizer_dispatch_name) EnsureDispatch("SAPI.SpVoice") self._recognizer = None self._compiler = None self._speaker = None self._recognition_observer_manager = Sapi5RecObsManager(self) self._timer_manager = DelegateTimerManager(0.02, self) if isinstance(retain_dir, string_types) or retain_dir is None: self._retain_dir = retain_dir else: self._retain_dir = None self._log.error("Invalid retain_dir: %r" % retain_dir)
[docs] def connect(self): """ Connect to back-end SR engine. """ self._recognizer = Dispatch(self.recognizer_dispatch_name) self._speaker = Sapi5Speaker() self._compiler = Sapi5Compiler()
[docs] def disconnect(self): """ Disconnect from back-end SR engine. """ self._recognizer = None self._speaker = None self._compiler = None
#----------------------------------------------------------------------- # Methods for working with grammars. def _load_grammar(self, grammar): """ Load the given *grammar*. """ self._log.debug("Loading grammar %s." % grammar.name) if not self._recognizer: self.connect() # Create recognition context, compile grammar, and create # the grammar wrapper object for managing this grammar. context = self._recognizer.CreateRecoContext() if self._retain_dir: context.RetainedAudio = constants.SRAORetainAudio handle = self._compiler.compile_grammar(grammar, context) wrapper = GrammarWrapper(grammar, handle, context, self, self._recognition_observer_manager) handle.State = constants.SGSEnabled for rule in grammar.rules: handle.CmdSetRuleState(rule.name, constants.SGDSActive) # self.activate_grammar(grammar) # for l in grammar.lists: # l._update() handle.CmdSetRuleState("_FakeRule", constants.SGDSActive) return wrapper def _unload_grammar(self, grammar, wrapper): """ Unload the given *grammar*. """ try: wrapper.handle.State = constants.SGSDisabled except Exception as e: self._log.exception("Failed to unload grammar %s: %s." % (grammar, e))
[docs] def activate_grammar(self, grammar): """ Activate the given *grammar*. """ self._log.debug("Activating grammar %s." % grammar.name) grammar_handle = self._get_grammar_wrapper(grammar).handle grammar_handle.State = constants.SGSEnabled
[docs] def deactivate_grammar(self, grammar): """ Deactivate the given *grammar*. """ self._log.debug("Deactivating grammar %s." % grammar.name) grammar_handle = self._get_grammar_wrapper(grammar).handle grammar_handle.State = constants.SGSDisabled
[docs] def activate_rule(self, rule, grammar): """ Activate the given *rule*. """ self._log.debug("Activating rule %s in grammar %s." % (rule.name, grammar.name)) grammar_handle = self._get_grammar_wrapper(grammar).handle grammar_handle.CmdSetRuleState(rule.name, constants.SGDSActive)
[docs] def deactivate_rule(self, rule, grammar): """ Deactivate the given *rule*. """ self._log.debug("Deactivating rule %s in grammar %s." % (rule.name, grammar.name)) grammar_handle = self._get_grammar_wrapper(grammar).handle grammar_handle.CmdSetRuleState(rule.name, constants.SGDSInactive)
def update_list(self, lst, grammar): grammar_handle = self._get_grammar_wrapper(grammar).handle list_rule_name = "__list_%s" % lst.name rule_handle = grammar_handle.Rules.FindRule(list_rule_name) rule_handle.Clear() src_state = rule_handle.InitialState dst_state = None for item in lst.get_list_items(): src_state.AddWordTransition(dst_state, item) grammar_handle.Rules.Commit()
[docs] def set_exclusiveness(self, grammar, exclusive): self._log.debug("Setting exclusiveness of grammar %s to %s." % (grammar.name, exclusive)) wrapper = self._get_grammar_wrapper(grammar) if exclusive and wrapper.handle.State != constants.SGSExclusive: wrapper.state_before_exclusive = wrapper.handle.State wrapper.handle.State = constants.SGSExclusive elif not exclusive and wrapper.handle.State == constants.SGSExclusive: assert wrapper.state_before_exclusive in (constants.SGSEnabled, constants.SGSDisabled) wrapper.handle.State = wrapper.state_before_exclusive
# grammar_handle.SetGrammarState(constants.SPGS_EXCLUSIVE) #----------------------------------------------------------------------- # Miscellaneous methods.
[docs] def mimic(self, words): """ Mimic a recognition of the given *words*. .. note:: This method has a few quirks to be aware of: #. Mimic can fail to recognize a command if the relevant grammar is not yet active. #. Mimic does not work reliably with the shared recognizer unless there are one or more exclusive grammars active. #. Mimic can **crash the process** in some circumstances, e.g. when mimicking non-ASCII characters. """ self._log.debug("SAPI5 mimic: %r" % (words,)) if isinstance(words, string_types): phrase = words else: phrase = " ".join(words) # Fail on empty input. if not phrase: raise MimicFailure("Invalid mimic input %r" % phrase) # Register a recognition observer for checking the success of this # mimic. observer = MimicObserver() observer.register() # Emulate recognition of the phrase and wait for recognition to # finish, timing out after 2 seconds. self._recognizer.EmulateRecognition(phrase) timeout = 2 NULL = c_int(win32con.NULL) if timeout != None: begin_time = time.time() windll.user32.SetTimer(NULL, NULL, int(timeout * 1000), NULL) message = MSG() message_pointer = pointer(message) while (not timeout) or (time.time() - begin_time < timeout): if timeout: self._log.debug("SAPI5 message loop: %s sec left" % (timeout + begin_time - time.time())) else: self._log.debug("SAPI5 message loop: no timeout") if windll.user32.GetMessageW(message_pointer, NULL, 0, 0) == 0: msg = str(WinError()) self._log.error("GetMessageW() failed: %s" % msg) raise EngineError("GetMessageW() failed: %s" % msg) self._log.debug("SAPI5 message: %r" % (message.message,)) if message.message == win32con.WM_TIMER: # A timer message means this loop has timed out. self._log.debug("SAPI5 message loop timed out: %s sec left" % (timeout + begin_time - time.time())) break else: # Process other messages as normal. self._log.debug("SAPI5 message translating and dispatching.") windll.user32.TranslateMessage(message_pointer) windll.user32.DispatchMessageW(message_pointer) if observer.status.startswith("recognition:"): # The previous message was a recognition which matched. self._log.debug("SAPI5 message caused recognition.") # Unregister the observer and check its status. observer.unregister() if observer.status == "failure": raise MimicFailure("Mimic failed.") elif observer.status == "none": raise MimicFailure("Mimic failed, nothing happened.")
[docs] def speak(self, text): """ Speak the given *text* using text-to-speech. """ self._speaker.speak(text)
def _get_language(self): if not self._recognizer: return "en" # Get Windows language identifiers for supported languages from the # recognizer's current status information. languages = self._recognizer.Status.SupportedLanguages # Lookup and return the language tag for the first supported # language ID. if languages: return self._get_language_tag(languages[0]) else: return "en" def _has_quoted_words_support(self): return False def _do_recognition(self): """ Recognize speech in a loop. This will also call any scheduled timer functions and ensure that the correct window context is used. """ # Register for window change events to activate/deactivate grammars # and rules on window changes, including window title changes. This # is done here because the SAPI5 'OnPhraseStart' grammar callback is # called after grammar state changes are allowed. WinEventProcType = WINFUNCTYPE(None, HANDLE, DWORD, HWND, LONG, LONG, DWORD, DWORD) self._last_foreground_window = None self._last_foreground_window_title = None def callback(hWinEventHook, event, hwnd, idObject, idChild, dwEventThread, dwmsEventTime): window = Window.get_foreground() # Note: hwnd doesn't always match window.handle, even when # foreground window changed (and sometimes it didn't change) window_changed = ( window != self._last_foreground_window or window == self._last_foreground_window and window.title != self._last_foreground_window_title ) if window_changed: self.process_grammars_context(window) self._last_foreground_window = window self._last_foreground_window_title = window.title def set_hook(win_event_proc, event_type): return windll.user32.SetWinEventHook( event_type, event_type, 0, win_event_proc, 0, 0, win32con.WINEVENT_OUTOFCONTEXT) win_event_proc = WinEventProcType(callback) windll.user32.SetWinEventHook.restype = HANDLE events = {win32con.EVENT_SYSTEM_FOREGROUND, win32con.EVENT_OBJECT_NAMECHANGE} hook_ids = [set_hook(win_event_proc, event) for event in events] # Recognize speech, call timer functions and handle window change # events in a loop. Stop on disconnect(). self.speak('beginning loop!') try: while self._recognizer is not None: pythoncom.PumpWaitingMessages() self.call_timer_callback() time.sleep(0.005) finally: # Unregister event hooks. for hook_id in hook_ids: windll.user32.UnhookWinEvent(hook_id)
#--------------------------------------------------------------------------- # Make the shared engine available as Sapi5Engine, for backwards # compatibility. Sapi5Engine = Sapi5SharedEngine #===========================================================================
[docs]class Sapi5InProcEngine(Sapi5SharedEngine): """ Speech recognition engine back-end for SAPI 5 in process recognizer. """ _name = "sapi5inproc" recognizer_dispatch_name = "SAPI.SpInProcRecognizer"
[docs] def connect(self, audio_source=0): """ Connect to the speech recognition backend. The audio source to use for speech recognition can be specified using the *audio_source* argument. If it is not given, it defaults to the first audio source found. """ Sapi5SharedEngine.connect(self) self.select_audio_source(audio_source)
[docs] def get_audio_sources(self): """ Get the available audio sources. This method returns a list of audio sources, each represented by a 3-element tuple: the index, the description, and the COM handle for the audio source. """ available_sources = self._recognizer.GetAudioInputs() audio_sources_list = [] for index, item in enumerate(collection_iter(available_sources)): audio_sources_list.append((index, item.GetDescription(), item)) return audio_sources_list
[docs] def select_audio_source(self, audio_source): """ Configure the speech recognition engine to use the given audio source. The audio source may be specified as follows: - As an *int* specifying the index of the audio source to use - As a *str* containing the description of the audio source to use, or a substring thereof The :meth:`get_audio_sources()` method can be used to retrieve the available sources together with their indices and descriptions. """ available_sources = self._recognizer.GetAudioInputs() if isinstance(audio_source, integer_types): # Parameter is the index of the source to use. if 0 <= audio_source < available_sources.Count: selected_source = available_sources.Item(audio_source) else: raise EngineError("Invalid audio source index: %r" " (%s sources available, so index must be" " in range 0 to %s)" % (audio_source, available_sources.Count, available_sources.Count - 1)) elif isinstance(audio_source, string_types): for item in collection_iter(available_sources): if audio_source in item.GetDescription(): selected_source = item break else: raise EngineError("Audio source not found: %r" % (audio_source)) else: raise EngineError("Invalid audio source qualifier: %r" % (audio_source)) self._log.info("Selecting audio source: %r" % (selected_source.GetDescription(),)) self._recognizer.AudioInput = selected_source
#--------------------------------------------------------------------------- # Utility generator function for iterating over COM collections. def collection_iter(collection): if not collection: return for index in range(0, collection.Count): yield collection.Item(index) #--------------------------------------------------------------------------- class GrammarWrapper(GrammarWrapperBase): def __init__(self, grammar, handle, context, engine, recobs_manager): GrammarWrapperBase.__init__(self, grammar, engine, recobs_manager) self.handle = handle self.context = context self.state_before_exclusive = handle.State # Register callback functions which will handle recognizer events. base = getevents("SAPI.SpSharedRecoContext") class ContextEvents(base): pass c = ContextEvents(context) c.OnPhraseStart = self.phrase_start_callback c.OnRecognition = self.recognition_callback if hasattr(grammar, "process_recognition_other"): c.OnRecognitionForOtherContext = self.recognition_other_callback if hasattr(grammar, "process_recognition_failure"): c.OnFalseRecognition = self.recognition_failure_callback def phrase_start_callback(self, stream_number, stream_position): window = Window.get_foreground() self.grammar.process_begin(window.executable, window.title, window.handle) def _retain_audio(self, newResult, results, rule_name): # Only write audio data and metadata if the directory exists. retain_dir = self.engine._retain_dir if retain_dir and not os.path.isdir(retain_dir): self.engine._log.warning( "Audio was not retained because '%s' was not a " "directory" % retain_dir ) elif retain_dir: try: file_stream = Dispatch("SAPI.SpFileStream") # Note: application can also retrieve smaller portions # of the audio stream by specifying a starting phrase # element and phrase element length. audio_stream = newResult.Audio() # Make sure we have audio data, which we wouldn't from a # mimic or if the retain flag wasn't set above. if audio_stream: # Write audio data. file_stream.Format = audio_stream.Format now = datetime.now() filename = ("retain_%s.wav" % now.strftime("%Y-%m-%d_%H-%M-%S_%f")) wav_path = os.path.join(retain_dir, filename) flags = constants.SSFMCreateForWrite file_stream.Open(wav_path, flags) try: file_stream.Write(audio_stream.GetData()) finally: file_stream.Close() # Write metadata words = ' '.join([r[2] for r in results]) audio_length = int(newResult.Times.Length) / 1e7 tsv_path = os.path.join(retain_dir, "retain.tsv") with open(tsv_path, "a") as tsv_file: tsv_file.write('\t'.join([ filename, str(audio_length), self.grammar.name, rule_name, words ]) + '\n') except: self.engine._log.exception("Exception retaining audio") def recognition_callback(self, StreamNumber, StreamPosition, RecognitionType, Result): try: newResult = Dispatch(Result) phrase_info = newResult.PhraseInfo rule_name = phrase_info.Rule.Name #--------------------------------------------------------------- # Build a list of rule names for each element. # First populate it with the top level rule name. element = phrase_info.Rule name = element.Name start = element.FirstElement count = element.NumberOfElements rule_names = [name] * count # Walk the tree of child rules and put their names in the list. stack = [collection_iter(phrase_info.Rule.Children)] while stack: try: element = next(stack[-1]) except StopIteration: stack.pop(); continue name = element.Name start = element.FirstElement count = element.NumberOfElements rule_names[start:start + count] = [name] * count if element.Children: stack.append(collection_iter(element.Children)) #--------------------------------------------------------------- # Prepare the words and rule names for the element parsers. replacements = [False] * len(rule_names) if phrase_info.Replacements: for replacement in collection_iter(phrase_info.Replacements): begin = replacement.FirstElement end = begin + replacement.NumberOfElements replacements[begin] = replacement.Text for index in range(begin + 1, end): replacements[index] = True results = [] rule_set = list(set(rule_names)) elements = phrase_info.Elements for index in range(len(rule_names)): element = elements.Item(index) rule_id = rule_set.index(rule_names[index]) # Map dictation rule IDs to 1M so that dragonfly recognizes # the words as dictation. if rule_names[index] == "dgndictation": rule_id = 1000000 replacement = replacements[index] info = [element.LexicalForm, rule_id, element.DisplayText, element.DisplayAttributes, replacement] results.append(info) #--------------------------------------------------------------- # Retain audio self._retain_audio(results, newResult, rule_name) #--------------------------------------------------------------- # Attempt to parse the recognition. if self.process_results(results, rule_set, newResult): return except Exception as e: Sapi5Engine._log.error("Grammar %s: exception: %s" % (self.grammar._name, e), exc_info=True) #------------------------------------------------------------------- # If this point is reached, then the recognition was not # processed successfully.. self._log.error("Grammar %s: failed to decode recognition %r." % (self.grammar._name, [r[0] for r in results])) def recognition_other_callback(self, StreamNumber, StreamPosition): # Note that SAPI 5.3 doesn't offer access to the actual # recognition contents during a # OnRecognitionForOtherContext event. func = getattr(self.grammar, "process_recognition_other", None) self._process_grammar_callback(func, words=False, results=None) def recognition_failure_callback(self, StreamNumber, StreamPosition, Result): func = getattr(self.grammar, "process_recognition_failure", None) self._process_grammar_callback(func, results=Dispatch(Result))