Extend Orchestra Python With Style
· 10 min read
Even a long journey starts with the first step. Making sense of all the data your company generates daily and all the external data sources you may need can definitely be a long journey. Hume Orchestra makes it easy to convert multiple distributed data sources into a single, connected source of truth: the knowledge graph.
Hume Orchestra uses an enterprise-grade stream technology to acquire, process and connect your data. This includes many specialised components you can lay out to describe your workflow. These components can read, manipulate and ultimately store billions of messages at scale.
Hume Orchestra supports a growing set of transformation components, including, among others, complex NLP enrichers and custom rest-based services enrichers. Here we want to talk about one of them, certainly one of the most used components: the message transformer.
The message transformer is a component that lets you execute arbitrary python code to alter the incoming messages.
The ability to put some specifically crafted snippets strategically gives you a very high degree of flexibility.
You can evaluate some complex logic before routing the message into the correct workflow section; you can preprocess textual data coming from the web or simply preprocess your message to simplify a Cypher query. The possibilities are endless.
Our message transformer component is powered by Jython, a Java-based python implementation as a library. Every command you run in a message transformer is actually interpreted and executed within the Java Virtual Machine instance that powers Orchestra itself.
Python notably comes with “battery included”, which means that it has a full set of built-in libraries. So far, our message transformer components support only modules coming from the python “standard library”.
Well, “so far” is not 100% true. In your message transformer, you can instruct the import machinery to scan extra folders, one of which can contain a textual file named something_new.py perhaps. Then another message transformer could import something_new. I think you get the point: with some hack, it is possible to extend the message transformer library with any pure python module.
We decided to encourage this, so in Hume 2.14.0, we introduced the support for extending the Orchestra Python. It allows you to use a particular mount volume where you can store any pure python packages. These packages will be available to your message transformers as modules you can import.
The system will automatically reload your packages, so any change to the module’s code will be applied during the next workflow run.
With great power comes great responsibility, and the message transformer is not an exception. Do you like responsibility? Because we are going to give you even more power!
Introducing Git integration
So far, to write and update your code, you have to access the machine running Orchestra directly. Sometimes this is perfectly fine, and sometimes it makes you ask yourself if it could be a little easier. Indeed it can be much easier with the help of a little tool called git-sync.
“git-sync is a simple command that pulls a git repository into a local directory. It is a perfect “sidecar” container in Kubernetes - it can periodically pull files down from a repository so that an application can consume them.”
Git-sync can happily live outside Kubernetes-based infrastructures; we can use it in our docker-compose-based Hume deployment to populate our special mount volume with the code coming from our online git repository.
Things are starting to get interesting here: you can now add this snippet in your message transformers:
from nlp_tools.preprocessing.names import fix_names, capitalisation_removal
This lets you use these useful preprocessing functions everywhere and just because they are useful, without even thinking about how they are implemented.
These functions can be defined in your git repo in this way:
# nlp_tools/preprocessing/names.py
def fix_names(text):
"""transform this "Dr.W.R.Robbins" into "Dr. W. R. Robbins" """
[...]
def capitalisation_removal(text):
""" replace capitalised tokens to help NER models:
"ABRAHAM FLEXNER" -> "Abraham Flexner"
BUT: do it only for clean alpha-tokens,
we treat person names like "I.J.Smith" elsewhere """
[...]
The specific implementations don’t have to be necessarily elegant as long as they behave as expected. This is true because you can continuously improve your solution and push it to your git repo. No longer than 30 seconds, your fresh code is ready to be used everywhere while your workflows stay the same.
Since we are talking about functions and behaviours, what about adding this:
# tests/nlp_tools/preprocessing/test_names.py
import unittest
from nlp_tools.preprocessing.names import fix_names, capitalisation_removal
class TestNamePreprocessing(unittest.TestCase):
def test_fix_names_fixes_names(self):
""" it should fix names ;) """
self.assertTrue(fix_names("Dr.W.R.Robbins"), "Dr. W. R. Robbins")
# todo add edge cases here
def test_capitalisation_removal(self):
""" it should remove clear name capitalisations """
self.assertTrue(capitalisation_removal("ABRAHAM FLEXNER"),
"Abraham Flexner")
# but
self.assertTrue(capitalisation_removal("I.J.Smith"), "I.J.Smith")
if __name__ == '__main__':
unittest.main()
Pairing some tests along with your utility functions is extremely useful for many reasons.
Obviously, tests can save you from regressions and make it easier to reproduce and fix bugs, but there’s more.
Tests like these make it clear the author’s intentions on how these functions are expected to be used and which results they are expected to give.
This aspect is very important in a collaborative environment but also when the code is yours and, when you come back after six months, you completely forgot the meaning of that odd regular expression.
Moreover, if your git repository supports Continuous Integration, these tests may run as a check before letting people merge with deploying branches.
Exploit the modules
We can push the thing a little bit further. As you may know, modules are singleton objects in Python. In a nutshell, they are executed once per process, just the first time they get imported. Any subsequent import gives you the very same instance of an object of some “module” type.
So can we use them as namespaces? Can we store some global state there? The answer is yes to both.
The code on the message transformer is executed within a function scope; we can actually use modules to overcome this limitation, producing side effects that last after the function execution.
To make things as easy as possible, let’s try to implement a very simple counter. Our tiny module’s duty is to track how often one of its functions is called.
The module can be as easy as :
#/sample/__init__.py
state = {
"counter": 0
}
def increment():
state["counter"] += 1
def getCounter():
return state["counter"]
We arbitrarily wrapped the state in a dictionary, but it could have been an object or something more complex and useful like a factory method.
Let’s use this code in a message transformer:
We can also use the two functions in two separate workflows, which will still work, so it can be used as a means for workflows to communicate.
Sadly this approach can not work as it is. If we restart the workflow, the modules get automatically reloaded, not just reimported. This automation is required to let you get the fresh code straight into your workflows without any manual operation.
As a consequence, if one of the workflows restarts, the module gets reimported, also causing the counter reset and thus making the module unable to store any state consistently.
To fix this, we need to take a step back and understand what it means to “reload” a module and what it means to import a module for the first time.
The first time you import a module, it gets executed statement by statement:
The same process holds for the rest of the code:
In a nutshell, every top-level definition in your file gets captured by this module reference, and this reference is exactly what you get in “sample” when you “import sample” in python.
It should be clear now that reloading a module basically means re-executing the module code while reusing the same module reference.
This means that both increment and getCounter are overwritten by the reload (without any interesting effect in our case). The state dictionary is also reinitialised, and the previous state is then lost.
We can overcome the issue by deferring the state initialisation, so state is not overwritten automatically on reload:
#/sample/__init__.py
[...]
def _init_my_state():
current_module = __import__(__name__)
if "my_state" not in dir(current_module):
current_module.my_state = {
"counter": 0
}
def my_increment():
_init_my_state()
my_state["counter"] += 1
def get_my_counter():
_init_my_state()
return my_state["counter"]
So now my_increment and get_my_counter can be used in the message transformer, and they will behave as expected.
Here this init_my_state function uses some quite unusual technique, and while it does precisely the required deferred state initialisation, I wouldn’t call it an elegant piece of code. Interestingly enough, it doesn’t really matter because these details are hidden from the end users. Users will just care about the increment function to behave consistently; implementation details can then evolve independently on how the functionalities are used.
Take away
We saw in this post how to use modules and, overall, how they make it easier to understand, design and maintain complex workflows.
We showed that decoupling the implementation and usage is beneficial: it makes improving the solutions without impacting the workflows easier.
We also saw that modules could be used as namespaces and status holders, even though some caution should be exercised.