Source code for dragonfly.engines.backend_natlink.engine

#
# This file is part of Dragonfly.
# (c) Copyright 2007, 2008 by Christo Butcher
# Licensed under the LGPL.
#
#   Dragonfly is free software: you can redistribute it and/or modify it
#   under the terms of the GNU Lesser General Public License as published
#   by the Free Software Foundation, either version 3 of the License, or
#   (at your option) any later version.
#
#   Dragonfly is distributed in the hope that it will be useful, but
#   WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#   Lesser General Public License for more details.
#
#   You should have received a copy of the GNU Lesser General Public
#   License along with Dragonfly.  If not, see
#   <http://www.gnu.org/licenses/>.
#

"""
SR back-end for DNS and Natlink
============================================================================

Detecting sleep mode
----------------------------------------------------------------------------

 - http://blogs.msdn.com/b/tsfaware/archive/2010/03/22/detecting-sleep-mode-in-sapi.aspx

"""

import os
import os.path
import pywintypes
import sys
import time
from datetime   import datetime
from locale     import getpreferredencoding
from threading  import Thread, Event

from six        import text_type, binary_type, string_types, PY2


from dragonfly.engines.base  import (EngineBase, EngineError, MimicFailure,
                                    GrammarWrapperBase)
from dragonfly.engines.backend_natlink.speaker    import NatlinkSpeaker
from dragonfly.engines.backend_natlink.compiler   import NatlinkCompiler
from dragonfly.engines.backend_natlink.dictation  import \
    NatlinkDictationContainer
from dragonfly.engines.backend_natlink.recobs     import \
    NatlinkRecObsManager
from dragonfly.engines.backend_natlink.timer      import NatlinkTimerManager


# ---------------------------------------------------------------------------

def map_word(word, encoding=getpreferredencoding(do_setlocale=False)):
    """
    Wraps output from Dragon.

    This wrapper ensures text output from the engine is Unicode. It assumes the
    encoding of byte streams is the current locale's preferred encoding by default.
    """
    if isinstance(word, text_type):
        return word
    elif isinstance(word, binary_type):
        return word.decode(encoding)
    return word


class TimerThread(Thread):
    """"""
    def __init__(self, engine):
        Thread.__init__(self)
        self._stop_event = Event()
        self.daemon = True
        self._timer = None
        self._engine = engine

    def start(self):
        if self._timer is None:
            def timer_function():
                # Let the thread run for a bit. This will yield control to
                # other threads.
                if self.is_alive():
                    self.join(0.0025)

            self._timer = self._engine.create_timer(timer_function, 0.025)

        Thread.start(self)

    def _stop_timer(self):
        if self._timer:
            self._timer.stop()
            self._timer = None

    def stop(self):
        self._stop_event.set()
        self._stop_timer()

    def run(self):
        while not self._stop_event.is_set():
            time.sleep(1)
        self._stop_timer()


[docs]class NatlinkEngine(EngineBase): """ Speech recognition engine back-end for Natlink and DNS. """ _name = "natlink" DictationContainer = NatlinkDictationContainer #----------------------------------------------------------------------- def __init__(self, retain_dir=None): """ :param retain_dir: directory to save audio data: A ``.wav`` file for each utterance, and ``retain.tsv`` file with each row listing (wav filename, wav length in seconds, grammar name, rule name, recognized text) as tab separated values. If this parameter is used in a module loaded by ``natlinkmain``, then the directory will be relative to the Natlink user directory (e.g. ``MacroSystem``). :type retain_dir: str|None """ EngineBase.__init__(self) self.natlink = None try: import natlink except ImportError: self._log.error("%s: failed to import natlink module." % self) raise EngineError("Requested engine 'natlink' is not " "available: Natlink is not installed.") self.natlink = natlink self._grammar_count = 0 self._recognition_observer_manager = NatlinkRecObsManager(self) self._timer_manager = NatlinkTimerManager(0.02, self) self._timer_thread = None self._retain_dir = None self._speaker = NatlinkSpeaker() try: self.set_retain_directory(retain_dir) except EngineError as err: self._retain_dir = None self._log.error(err)
[docs] def apply_threading_fix(self): """ Start a thread and engine timer internally to allow Python threads to work properly while connected to natlink. The fix is only applied once, successive calls have no effect. This method is called automatically when :meth:`connect` is called or when a grammar is loaded for the first time. """ # Start a thread and engine timer to allow Python threads to work # properly while connected to Natlink. # Only start the thread if one isn't already active. if self._timer_thread is None: self._timer_thread = TimerThread(self) self._timer_thread.start()
[docs] def connect(self): """ Connect to natlink with Python threading support enabled. """ self.natlink.natConnect(True) self.apply_threading_fix()
[docs] def disconnect(self): """ Disconnect from natlink. """ # Unload all grammars from the engine so that Dragon doesn't keep # recognizing them. for grammar in self.grammars: grammar.unload() # Close the the waitForSpeech() dialog box if it is active for this # process. from dragonfly import Window target_title = "Natlink / Python Subsystem" for window in Window.get_matching_windows(title=target_title): if window.is_visible and window.pid == os.getpid(): try: window.close() except pywintypes.error: pass break # Stop the special timer thread if it is running. if self._timer_thread: self._timer_thread.stop() self._timer_thread = None # Finally disconnect from natlink. self.natlink.natDisconnect()
# ----------------------------------------------------------------------- # Methods for working with grammars. def _load_grammar(self, grammar): """ Load the given *grammar* into natlink. """ self._log.debug("Engine %s: loading grammar %s." % (self, grammar.name)) grammar_object = self.natlink.GramObj() wrapper = GrammarWrapper(grammar, grammar_object, self, self._recognition_observer_manager) grammar_object.setBeginCallback(wrapper.begin_callback) grammar_object.setResultsCallback(wrapper.results_callback) grammar_object.setHypothesisCallback(None) c = NatlinkCompiler() (compiled_grammar, rule_names) = c.compile_grammar(grammar) wrapper.rule_names = rule_names all_results = (hasattr(grammar, "process_recognition_other") or hasattr(grammar, "process_recognition_failure")) hypothesis = False attempt_connect = False try: grammar_object.load(compiled_grammar, all_results, hypothesis) except self.natlink.NatError as e: # If loading failed because we're not connected yet, # attempt to connect to natlink and reload the grammar. if (str(e) == "Calling GramObj.load is not allowed before" " calling natConnect"): attempt_connect = True else: self._log.exception("Failed to load grammar %s: %s." % (grammar, e)) raise EngineError("Failed to load grammar %s: %s." % (grammar, e)) if attempt_connect: self.connect() try: grammar_object.load(compiled_grammar, all_results, hypothesis) except self.natlink.NatError as e: self._log.exception("Failed to load grammar %s: %s." % (grammar, e)) raise EngineError("Failed to load grammar %s: %s." % (grammar, e)) # Apply the threading fix if it hasn't been applied yet. self.apply_threading_fix() # Return the grammar wrapper. return wrapper def _unload_grammar(self, grammar, wrapper): """ Unload the given *grammar* from natlink. """ try: grammar_object = wrapper.grammar_object grammar_object.unload() grammar_object.setBeginCallback(None) grammar_object.setResultsCallback(None) grammar_object.setHypothesisCallback(None) except self.natlink.NatError as e: self._log.exception("Failed to unload grammar %s: %s." % (grammar, e))
[docs] def set_exclusiveness(self, grammar, exclusive): try: grammar_object = self._get_grammar_wrapper(grammar).grammar_object grammar_object.setExclusive(exclusive) except self.natlink.NatError as e: self._log.exception("Engine %s: failed set exclusiveness: %s." % (self, e))
def activate_grammar(self, grammar): self._log.debug("Activating grammar %s." % grammar.name) pass def deactivate_grammar(self, grammar): self._log.debug("Deactivating grammar %s." % grammar.name) pass def activate_rule(self, rule, grammar): self._log.debug("Activating rule %s in grammar %s." % (rule.name, grammar.name)) wrapper = self._get_grammar_wrapper(grammar) if not wrapper: return grammar_object = wrapper.grammar_object grammar_object.activate(rule.name, 0) def deactivate_rule(self, rule, grammar): self._log.debug("Deactivating rule %s in grammar %s." % (rule.name, grammar.name)) wrapper = self._get_grammar_wrapper(grammar) if not wrapper: return grammar_object = wrapper.grammar_object grammar_object.deactivate(rule.name) def update_list(self, lst, grammar): wrapper = self._get_grammar_wrapper(grammar) if not wrapper: return grammar_object = wrapper.grammar_object # First empty then populate the list. Use the local variables # n and f as an optimization. n = lst.name f = grammar_object.appendList grammar_object.emptyList(n) [f(n, word) for word in lst.get_list_items()] #----------------------------------------------------------------------- # Miscellaneous methods. def _do_recognition(self): self.natlink.waitForSpeech()
[docs] def mimic(self, words): """ Mimic a recognition of the given *words*. .. note:: This method has a few quirks to be aware of: #. Mimic is not limited to one element per word as seen with proper nouns from DNS. For example, "Buffalo Bills" can be passed as one word. #. Mimic can handle by the extra formatting by DNS built-in commands. #. Mimic is case sensitive. """ if isinstance(words, string_types): words = words.split() try: prepared_words = [] if PY2: encoding = getpreferredencoding() for word in words: if isinstance(word, text_type): word = word.encode(encoding) prepared_words.append(word) else: for word in words: prepared_words.append(word) if len(prepared_words) == 0: raise TypeError("empty list or string") except Exception as e: raise MimicFailure("Invalid mimic input %r: %s." % (words, e)) try: self.natlink.recognitionMimic(prepared_words) except self.natlink.MimicFailed: raise MimicFailure("No matching rule found for words %r." % (prepared_words,))
[docs] def speak(self, text): """ Speak the given *text* using text-to-speech. """ self._speaker.speak(text)
def _get_language(self): # Get a Windows language identifier from Dragon. import win32com.client app = win32com.client.Dispatch("Dragon.DgnEngineControl") language = app.SpeakerLanguage("") # Lookup and return the language tag. return self._get_language_tag(language) def _has_quoted_words_support(self): return True
[docs] def set_retain_directory(self, retain_dir): """ Set the directory where audio data is saved. Retaining audio data may be useful for acoustic model training. This is disabled by default. If a relative path is used and the code is running via natspeak.exe, then the path will be made relative to the Natlink user directory or base directory (e.g. ``MacroSystem``). :param retain_dir: retain directory path :type retain_dir: string|None """ is_string = isinstance(retain_dir, string_types) if not (retain_dir is None or is_string): raise EngineError("Invalid retain_dir: %r" % retain_dir) if is_string: # Handle relative paths by using the Natlink user directory or # base directory. Only do this if running via natspeak.exe. try: import natlinkstatus except ImportError: natlinkstatus = None running_via_natspeak = ( sys.executable.endswith("natspeak.exe") and natlinkstatus is not None ) if not os.path.isabs(retain_dir) and running_via_natspeak: status = natlinkstatus.NatlinkStatus() user_dir = status.getUserDirectory() retain_dir = os.path.join( # Use the base dir if user dir isn't set. user_dir if user_dir else status.BaseDirectory, retain_dir ) # Set the retain directory. self._retain_dir = retain_dir
#--------------------------------------------------------------------------- class GrammarWrapper(GrammarWrapperBase): # Enable guessing at which words were dictated, since DNS does not # always report accurate rule IDs. _dictated_word_guesses_enabled = True def __init__(self, grammar, grammar_object, engine, recobs_manager): GrammarWrapperBase.__init__(self, grammar, engine, recobs_manager) self.grammar_object = grammar_object self.rule_names = None def begin_callback(self, module_info): executable, title, handle = tuple(map_word(word) for word in module_info) self.grammar.process_begin(executable, title, handle) def _decode_grammar_rules(self, state, words, results, *args): # Iterate through this grammar's rules, attempting to decode each. # If successful, call that rule's method for processing the # recognition and return. for rule in self.grammar.rules: if not (rule.active and rule.exported): continue state.initialize_decoding() for _ in rule.decode(state): if state.finished(): self._retain_audio(words, results, rule.name) root = state.build_parse_tree() # Notify observers using the manager *before* # processing. # TODO Use words="other" instead, with a special # recobs grammar wrapper at index 0. notify_args = (words, rule, root, results) self.recobs_manager.notify_recognition( *notify_args ) try: rule.process_recognition(root) except Exception as e: self._log.exception("Failed to process rule " "'%s': %s" % (rule.name, e)) self.recobs_manager.notify_post_recognition( *notify_args ) return True return False def results_callback(self, words, results): self._log.debug("Grammar %s: received recognition %r." % (self.grammar.name, words)) if words == "other": result_words = tuple(map_word(w) for w in results.getWords(0)) self.process_special_results(words, result_words, results) return elif words == "reject": self.process_special_results(words, None, results) return # If the words argument was not "other" or "reject", then # it is a sequence of (word, rule_id) 2-tuples. Convert this # into a tuple of unicode objects. words_rules = tuple((map_word(w), r) for w, r in words) words = tuple(w for w, r in words_rules) # Process this recognition. if self.process_results(words_rules, self.rule_names, results): return # Failed to decode recognition. self._log.error("Grammar %s: failed to decode recognition %r." % (self.grammar._name, words)) def _retain_audio(self, words, results, rule_name): # Only write audio data and metadata if the directory exists. retain_dir = self.engine._retain_dir if retain_dir and not os.path.isdir(retain_dir): self.engine._log.warning( "Audio was not retained because '%s' was not a " "directory" % retain_dir ) elif retain_dir: try: audio = results.getWave() # Make sure we have audio data if len(audio) > 0: # Write audio data. now = datetime.now() filename = ("retain_%s.wav" % now.strftime("%Y-%m-%d_%H-%M-%S_%f")) wav_path = os.path.join(retain_dir, filename) with open(wav_path, "wb") as f: f.write(audio) # Write metadata, assuming 11025Hz 16bit mono audio text = ' '.join(words) audio_length = float(len(audio) / 2) / 11025 tsv_path = os.path.join(retain_dir, "retain.tsv") with open(tsv_path, "a") as tsv_file: tsv_file.write('\t'.join([ filename, text_type(audio_length), self.grammar.name, rule_name, text ]) + '\n') except: self.engine._log.exception("Exception retaining audio")